2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 #include <common/futex.h>
24 #include <common/macros.h>
28 #include "testpoint.h"
30 #include "health-sessiond.h"
31 #include "lttng-sessiond.h"
34 struct thread_notifiers
{
35 struct ust_cmd_queue
*ust_cmd_queue
;
36 int apps_cmd_pipe_write_fd
;
37 int apps_cmd_notify_pipe_write_fd
;
38 int dispatch_thread_exit
;
42 * For each tracing session, update newly registered apps. The session list
43 * lock MUST be acquired before calling this.
45 static void update_ust_app(int app_sock
)
47 struct ltt_session
*sess
, *stmp
;
48 const struct ltt_session_list
*session_list
= session_get_list();
50 /* Consumer is in an ERROR state. Stop any application update. */
51 if (uatomic_read(&ust_consumerd_state
) == CONSUMER_ERROR
) {
52 /* Stop the update process since the consumer is dead. */
56 /* For all tracing session(s) */
57 cds_list_for_each_entry_safe(sess
, stmp
, &session_list
->head
, list
) {
60 if (!session_get(sess
)) {
64 if (!sess
->active
|| !sess
->ust_session
) {
69 assert(app_sock
>= 0);
70 app
= ust_app_find_by_sock(app_sock
);
73 * Application can be unregistered before so
74 * this is possible hence simply stopping the
77 DBG3("UST app update failed to find app sock %d",
81 ust_app_global_update(sess
->ust_session
, app
);
91 * Sanitize the wait queue of the dispatch registration thread meaning removing
92 * invalid nodes from it. This is to avoid memory leaks for the case the UST
93 * notify socket is never received.
95 static void sanitize_wait_queue(struct ust_reg_wait_queue
*wait_queue
)
97 int ret
, nb_fd
= 0, i
;
98 unsigned int fd_added
= 0;
99 struct lttng_poll_event events
;
100 struct ust_reg_wait_node
*wait_node
= NULL
, *tmp_wait_node
;
104 lttng_poll_init(&events
);
106 /* Just skip everything for an empty queue. */
107 if (!wait_queue
->count
) {
111 ret
= lttng_poll_create(&events
, wait_queue
->count
, LTTNG_CLOEXEC
);
116 cds_list_for_each_entry_safe(wait_node
, tmp_wait_node
,
117 &wait_queue
->head
, head
) {
118 assert(wait_node
->app
);
119 ret
= lttng_poll_add(&events
, wait_node
->app
->sock
,
120 LPOLLHUP
| LPOLLERR
);
133 * Poll but don't block so we can quickly identify the faulty events and
134 * clean them afterwards from the wait queue.
136 ret
= lttng_poll_wait(&events
, 0);
142 for (i
= 0; i
< nb_fd
; i
++) {
144 uint32_t revents
= LTTNG_POLL_GETEV(&events
, i
);
145 int pollfd
= LTTNG_POLL_GETFD(&events
, i
);
147 cds_list_for_each_entry_safe(wait_node
, tmp_wait_node
,
148 &wait_queue
->head
, head
) {
149 if (pollfd
== wait_node
->app
->sock
&&
150 (revents
& (LPOLLHUP
| LPOLLERR
))) {
151 cds_list_del(&wait_node
->head
);
153 ust_app_destroy(wait_node
->app
);
156 * Silence warning of use-after-free in
157 * cds_list_for_each_entry_safe which uses
158 * __typeof__(*wait_node).
163 ERR("Unexpected poll events %u for sock %d", revents
, pollfd
);
170 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd
);
174 lttng_poll_clean(&events
);
178 lttng_poll_clean(&events
);
180 ERR("Unable to sanitize wait queue");
185 * Send a socket to a thread This is called from the dispatch UST registration
186 * thread once all sockets are set for the application.
188 * The sock value can be invalid, we don't really care, the thread will handle
189 * it and make the necessary cleanup if so.
191 * On success, return 0 else a negative value being the errno message of the
194 static int send_socket_to_thread(int fd
, int sock
)
199 * It's possible that the FD is set as invalid with -1 concurrently just
200 * before calling this function being a shutdown state of the thread.
207 ret
= lttng_write(fd
, &sock
, sizeof(sock
));
208 if (ret
< sizeof(sock
)) {
209 PERROR("write apps pipe %d", fd
);
216 /* All good. Don't send back the write positive ret value. */
222 static void cleanup_ust_dispatch_thread(void *data
)
228 * Dispatch request from the registration threads to the application
229 * communication thread.
231 static void *thread_dispatch_ust_registration(void *data
)
234 struct cds_wfcq_node
*node
;
235 struct ust_command
*ust_cmd
= NULL
;
236 struct ust_reg_wait_node
*wait_node
= NULL
, *tmp_wait_node
;
237 struct ust_reg_wait_queue wait_queue
= {
240 struct thread_notifiers
*notifiers
= data
;
242 rcu_register_thread();
244 health_register(health_sessiond
, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH
);
246 if (testpoint(sessiond_thread_app_reg_dispatch
)) {
247 goto error_testpoint
;
250 health_code_update();
252 CDS_INIT_LIST_HEAD(&wait_queue
.head
);
254 DBG("[thread] Dispatch UST command started");
257 health_code_update();
259 /* Atomically prepare the queue futex */
260 futex_nto1_prepare(¬ifiers
->ust_cmd_queue
->futex
);
262 if (CMM_LOAD_SHARED(notifiers
->dispatch_thread_exit
)) {
267 struct ust_app
*app
= NULL
;
271 * Make sure we don't have node(s) that have hung up before receiving
272 * the notify socket. This is to clean the list in order to avoid
273 * memory leaks from notify socket that are never seen.
275 sanitize_wait_queue(&wait_queue
);
277 health_code_update();
278 /* Dequeue command for registration */
279 node
= cds_wfcq_dequeue_blocking(
280 ¬ifiers
->ust_cmd_queue
->head
,
281 ¬ifiers
->ust_cmd_queue
->tail
);
283 DBG("Woken up but nothing in the UST command queue");
284 /* Continue thread execution */
288 ust_cmd
= caa_container_of(node
, struct ust_command
, node
);
290 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
291 " gid:%d sock:%d name:%s (version %d.%d)",
292 ust_cmd
->reg_msg
.pid
, ust_cmd
->reg_msg
.ppid
,
293 ust_cmd
->reg_msg
.uid
, ust_cmd
->reg_msg
.gid
,
294 ust_cmd
->sock
, ust_cmd
->reg_msg
.name
,
295 ust_cmd
->reg_msg
.major
, ust_cmd
->reg_msg
.minor
);
297 if (ust_cmd
->reg_msg
.type
== USTCTL_SOCKET_CMD
) {
298 wait_node
= zmalloc(sizeof(*wait_node
));
300 PERROR("zmalloc wait_node dispatch");
301 ret
= close(ust_cmd
->sock
);
303 PERROR("close ust sock dispatch %d", ust_cmd
->sock
);
305 lttng_fd_put(LTTNG_FD_APPS
, 1);
310 CDS_INIT_LIST_HEAD(&wait_node
->head
);
312 /* Create application object if socket is CMD. */
313 wait_node
->app
= ust_app_create(&ust_cmd
->reg_msg
,
315 if (!wait_node
->app
) {
316 ret
= close(ust_cmd
->sock
);
318 PERROR("close ust sock dispatch %d", ust_cmd
->sock
);
320 lttng_fd_put(LTTNG_FD_APPS
, 1);
328 * Add application to the wait queue so we can set the notify
329 * socket before putting this object in the global ht.
331 cds_list_add(&wait_node
->head
, &wait_queue
.head
);
337 * We have to continue here since we don't have the notify
338 * socket and the application MUST be added to the hash table
339 * only at that moment.
344 * Look for the application in the local wait queue and set the
345 * notify socket if found.
347 cds_list_for_each_entry_safe(wait_node
, tmp_wait_node
,
348 &wait_queue
.head
, head
) {
349 health_code_update();
350 if (wait_node
->app
->pid
== ust_cmd
->reg_msg
.pid
) {
351 wait_node
->app
->notify_sock
= ust_cmd
->sock
;
352 cds_list_del(&wait_node
->head
);
354 app
= wait_node
->app
;
357 DBG3("UST app notify socket %d is set", ust_cmd
->sock
);
363 * With no application at this stage the received socket is
364 * basically useless so close it before we free the cmd data
365 * structure for good.
368 ret
= close(ust_cmd
->sock
);
370 PERROR("close ust sock dispatch %d", ust_cmd
->sock
);
372 lttng_fd_put(LTTNG_FD_APPS
, 1);
382 * Lock the global session list so from the register up to the
383 * registration done message, no thread can see the application
384 * and change its state.
390 * Add application to the global hash table. This needs to be
391 * done before the update to the UST registry can locate the
396 /* Set app version. This call will print an error if needed. */
397 (void) ust_app_version(app
);
399 /* Send notify socket through the notify pipe. */
400 ret
= send_socket_to_thread(
401 notifiers
->apps_cmd_notify_pipe_write_fd
,
405 session_unlock_list();
407 * No notify thread, stop the UST tracing. However, this is
408 * not an internal error of the this thread thus setting
409 * the health error code to a normal exit.
416 * Update newly registered application with the tracing
417 * registry info already enabled information.
419 update_ust_app(app
->sock
);
422 * Don't care about return value. Let the manage apps threads
423 * handle app unregistration upon socket close.
425 (void) ust_app_register_done(app
);
428 * Even if the application socket has been closed, send the app
429 * to the thread and unregistration will take place at that
432 ret
= send_socket_to_thread(
433 notifiers
->apps_cmd_pipe_write_fd
,
437 session_unlock_list();
439 * No apps. thread, stop the UST tracing. However, this is
440 * not an internal error of the this thread thus setting
441 * the health error code to a normal exit.
448 session_unlock_list();
450 } while (node
!= NULL
);
453 /* Futex wait on queue. Blocking call on futex() */
454 futex_nto1_wait(¬ifiers
->ust_cmd_queue
->futex
);
457 /* Normal exit, no error */
461 /* Clean up wait queue. */
462 cds_list_for_each_entry_safe(wait_node
, tmp_wait_node
,
463 &wait_queue
.head
, head
) {
464 cds_list_del(&wait_node
->head
);
469 /* Empty command queue. */
471 /* Dequeue command for registration */
472 node
= cds_wfcq_dequeue_blocking(
473 ¬ifiers
->ust_cmd_queue
->head
,
474 ¬ifiers
->ust_cmd_queue
->tail
);
478 ust_cmd
= caa_container_of(node
, struct ust_command
, node
);
479 ret
= close(ust_cmd
->sock
);
481 PERROR("close ust sock exit dispatch %d", ust_cmd
->sock
);
483 lttng_fd_put(LTTNG_FD_APPS
, 1);
488 DBG("Dispatch thread dying");
491 ERR("Health error occurred in %s", __func__
);
493 health_unregister(health_sessiond
);
494 rcu_unregister_thread();
498 static bool shutdown_ust_dispatch_thread(void *data
)
500 struct thread_notifiers
*notifiers
= data
;
502 CMM_STORE_SHARED(notifiers
->dispatch_thread_exit
, 1);
503 futex_nto1_wake(¬ifiers
->ust_cmd_queue
->futex
);
507 bool launch_ust_dispatch_thread(struct ust_cmd_queue
*cmd_queue
,
508 int apps_cmd_pipe_write_fd
,
509 int apps_cmd_notify_pipe_write_fd
)
511 struct lttng_thread
*thread
;
512 struct thread_notifiers
*notifiers
;
514 notifiers
= zmalloc(sizeof(*notifiers
));
518 notifiers
->ust_cmd_queue
= cmd_queue
;
519 notifiers
->apps_cmd_pipe_write_fd
= apps_cmd_pipe_write_fd
;
520 notifiers
->apps_cmd_notify_pipe_write_fd
= apps_cmd_notify_pipe_write_fd
;
522 thread
= lttng_thread_create("UST registration dispatch",
523 thread_dispatch_ust_registration
,
524 shutdown_ust_dispatch_thread
,
525 cleanup_ust_dispatch_thread
,
530 lttng_thread_put(thread
);