Fix: sessiond: ust session is inactive during ust_app_global_update
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.c
CommitLineData
dd5875e7
JG
1/*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20#include <stddef.h>
21#include <stdlib.h>
22#include <urcu.h>
23#include <common/futex.h>
24#include <common/macros.h>
25
26#include "dispatch.h"
27#include "ust-app.h"
28#include "testpoint.h"
29#include "fd-limit.h"
30#include "health-sessiond.h"
31#include "lttng-sessiond.h"
32#include "thread.h"
33
34struct thread_notifiers {
35 struct ust_cmd_queue *ust_cmd_queue;
36 int apps_cmd_pipe_write_fd;
37 int apps_cmd_notify_pipe_write_fd;
38 int dispatch_thread_exit;
39};
40
41/*
42 * For each tracing session, update newly registered apps. The session list
43 * lock MUST be acquired before calling this.
44 */
45static void update_ust_app(int app_sock)
46{
47 struct ltt_session *sess, *stmp;
48 const struct ltt_session_list *session_list = session_get_list();
49
50 /* Consumer is in an ERROR state. Stop any application update. */
51 if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) {
52 /* Stop the update process since the consumer is dead. */
53 return;
54 }
55
56 /* For all tracing session(s) */
57 cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) {
58 struct ust_app *app;
59
60 if (!session_get(sess)) {
61 continue;
62 }
63 session_lock(sess);
f559728d
JR
64 if (!sess->active || !sess->ust_session ||
65 !sess->ust_session->active) {
dd5875e7
JG
66 goto unlock_session;
67 }
68
69 rcu_read_lock();
70 assert(app_sock >= 0);
71 app = ust_app_find_by_sock(app_sock);
72 if (app == NULL) {
73 /*
74 * Application can be unregistered before so
75 * this is possible hence simply stopping the
76 * update.
77 */
78 DBG3("UST app update failed to find app sock %d",
79 app_sock);
80 goto unlock_rcu;
81 }
82 ust_app_global_update(sess->ust_session, app);
83 unlock_rcu:
84 rcu_read_unlock();
85 unlock_session:
86 session_unlock(sess);
87 session_put(sess);
88 }
89}
90
91/*
92 * Sanitize the wait queue of the dispatch registration thread meaning removing
93 * invalid nodes from it. This is to avoid memory leaks for the case the UST
94 * notify socket is never received.
95 */
96static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
97{
98 int ret, nb_fd = 0, i;
99 unsigned int fd_added = 0;
100 struct lttng_poll_event events;
101 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
102
103 assert(wait_queue);
104
105 lttng_poll_init(&events);
106
107 /* Just skip everything for an empty queue. */
108 if (!wait_queue->count) {
109 goto end;
110 }
111
112 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
113 if (ret < 0) {
114 goto error_create;
115 }
116
117 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
118 &wait_queue->head, head) {
119 assert(wait_node->app);
120 ret = lttng_poll_add(&events, wait_node->app->sock,
121 LPOLLHUP | LPOLLERR);
122 if (ret < 0) {
123 goto error;
124 }
125
126 fd_added = 1;
127 }
128
129 if (!fd_added) {
130 goto end;
131 }
132
133 /*
134 * Poll but don't block so we can quickly identify the faulty events and
135 * clean them afterwards from the wait queue.
136 */
137 ret = lttng_poll_wait(&events, 0);
138 if (ret < 0) {
139 goto error;
140 }
141 nb_fd = ret;
142
143 for (i = 0; i < nb_fd; i++) {
144 /* Get faulty FD. */
145 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
146 int pollfd = LTTNG_POLL_GETFD(&events, i);
147
dd5875e7
JG
148 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
149 &wait_queue->head, head) {
150 if (pollfd == wait_node->app->sock &&
151 (revents & (LPOLLHUP | LPOLLERR))) {
152 cds_list_del(&wait_node->head);
153 wait_queue->count--;
154 ust_app_destroy(wait_node->app);
155 free(wait_node);
156 /*
157 * Silence warning of use-after-free in
158 * cds_list_for_each_entry_safe which uses
159 * __typeof__(*wait_node).
160 */
161 wait_node = NULL;
162 break;
163 } else {
164 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
165 goto error;
166 }
167 }
168 }
169
170 if (nb_fd > 0) {
171 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
172 }
173
174end:
175 lttng_poll_clean(&events);
176 return;
177
178error:
179 lttng_poll_clean(&events);
180error_create:
181 ERR("Unable to sanitize wait queue");
182 return;
183}
184
185/*
186 * Send a socket to a thread This is called from the dispatch UST registration
187 * thread once all sockets are set for the application.
188 *
189 * The sock value can be invalid, we don't really care, the thread will handle
190 * it and make the necessary cleanup if so.
191 *
192 * On success, return 0 else a negative value being the errno message of the
193 * write().
194 */
195static int send_socket_to_thread(int fd, int sock)
196{
197 ssize_t ret;
198
199 /*
200 * It's possible that the FD is set as invalid with -1 concurrently just
201 * before calling this function being a shutdown state of the thread.
202 */
203 if (fd < 0) {
204 ret = -EBADF;
205 goto error;
206 }
207
208 ret = lttng_write(fd, &sock, sizeof(sock));
209 if (ret < sizeof(sock)) {
210 PERROR("write apps pipe %d", fd);
211 if (ret < 0) {
212 ret = -errno;
213 }
214 goto error;
215 }
216
217 /* All good. Don't send back the write positive ret value. */
218 ret = 0;
219error:
220 return (int) ret;
221}
222
223static void cleanup_ust_dispatch_thread(void *data)
224{
225 free(data);
226}
227
228/*
229 * Dispatch request from the registration threads to the application
230 * communication thread.
231 */
232static void *thread_dispatch_ust_registration(void *data)
233{
234 int ret, err = -1;
235 struct cds_wfcq_node *node;
236 struct ust_command *ust_cmd = NULL;
237 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
238 struct ust_reg_wait_queue wait_queue = {
239 .count = 0,
240 };
241 struct thread_notifiers *notifiers = data;
242
243 rcu_register_thread();
244
245 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
246
247 if (testpoint(sessiond_thread_app_reg_dispatch)) {
248 goto error_testpoint;
249 }
250
251 health_code_update();
252
253 CDS_INIT_LIST_HEAD(&wait_queue.head);
254
255 DBG("[thread] Dispatch UST command started");
256
257 for (;;) {
258 health_code_update();
259
260 /* Atomically prepare the queue futex */
261 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
262
263 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
264 break;
265 }
266
267 do {
268 struct ust_app *app = NULL;
269 ust_cmd = NULL;
270
271 /*
272 * Make sure we don't have node(s) that have hung up before receiving
273 * the notify socket. This is to clean the list in order to avoid
274 * memory leaks from notify socket that are never seen.
275 */
276 sanitize_wait_queue(&wait_queue);
277
278 health_code_update();
279 /* Dequeue command for registration */
280 node = cds_wfcq_dequeue_blocking(
281 &notifiers->ust_cmd_queue->head,
282 &notifiers->ust_cmd_queue->tail);
283 if (node == NULL) {
284 DBG("Woken up but nothing in the UST command queue");
285 /* Continue thread execution */
286 break;
287 }
288
289 ust_cmd = caa_container_of(node, struct ust_command, node);
290
291 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
292 " gid:%d sock:%d name:%s (version %d.%d)",
293 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
294 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
295 ust_cmd->sock, ust_cmd->reg_msg.name,
296 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
297
298 if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) {
299 wait_node = zmalloc(sizeof(*wait_node));
300 if (!wait_node) {
301 PERROR("zmalloc wait_node dispatch");
302 ret = close(ust_cmd->sock);
303 if (ret < 0) {
304 PERROR("close ust sock dispatch %d", ust_cmd->sock);
305 }
306 lttng_fd_put(LTTNG_FD_APPS, 1);
307 free(ust_cmd);
3577d7d8 308 ust_cmd = NULL;
dd5875e7
JG
309 goto error;
310 }
311 CDS_INIT_LIST_HEAD(&wait_node->head);
312
313 /* Create application object if socket is CMD. */
314 wait_node->app = ust_app_create(&ust_cmd->reg_msg,
315 ust_cmd->sock);
316 if (!wait_node->app) {
317 ret = close(ust_cmd->sock);
318 if (ret < 0) {
319 PERROR("close ust sock dispatch %d", ust_cmd->sock);
320 }
321 lttng_fd_put(LTTNG_FD_APPS, 1);
322 free(wait_node);
3577d7d8 323 wait_node = NULL;
dd5875e7 324 free(ust_cmd);
3577d7d8 325 ust_cmd = NULL;
dd5875e7
JG
326 continue;
327 }
328 /*
329 * Add application to the wait queue so we can set the notify
330 * socket before putting this object in the global ht.
331 */
332 cds_list_add(&wait_node->head, &wait_queue.head);
333 wait_queue.count++;
334
335 free(ust_cmd);
3577d7d8 336 ust_cmd = NULL;
dd5875e7
JG
337 /*
338 * We have to continue here since we don't have the notify
339 * socket and the application MUST be added to the hash table
340 * only at that moment.
341 */
342 continue;
343 } else {
344 /*
345 * Look for the application in the local wait queue and set the
346 * notify socket if found.
347 */
348 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
349 &wait_queue.head, head) {
350 health_code_update();
351 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
352 wait_node->app->notify_sock = ust_cmd->sock;
353 cds_list_del(&wait_node->head);
354 wait_queue.count--;
355 app = wait_node->app;
356 free(wait_node);
3577d7d8 357 wait_node = NULL;
dd5875e7
JG
358 DBG3("UST app notify socket %d is set", ust_cmd->sock);
359 break;
360 }
361 }
362
363 /*
364 * With no application at this stage the received socket is
365 * basically useless so close it before we free the cmd data
366 * structure for good.
367 */
368 if (!app) {
369 ret = close(ust_cmd->sock);
370 if (ret < 0) {
371 PERROR("close ust sock dispatch %d", ust_cmd->sock);
372 }
373 lttng_fd_put(LTTNG_FD_APPS, 1);
374 }
375 free(ust_cmd);
3577d7d8 376 ust_cmd = NULL;
dd5875e7
JG
377 }
378
379 if (app) {
380 /*
381 * @session_lock_list
382 *
383 * Lock the global session list so from the register up to the
384 * registration done message, no thread can see the application
385 * and change its state.
386 */
387 session_lock_list();
388 rcu_read_lock();
389
390 /*
391 * Add application to the global hash table. This needs to be
392 * done before the update to the UST registry can locate the
393 * application.
394 */
395 ust_app_add(app);
396
397 /* Set app version. This call will print an error if needed. */
398 (void) ust_app_version(app);
399
400 /* Send notify socket through the notify pipe. */
401 ret = send_socket_to_thread(
402 notifiers->apps_cmd_notify_pipe_write_fd,
403 app->notify_sock);
404 if (ret < 0) {
405 rcu_read_unlock();
406 session_unlock_list();
407 /*
408 * No notify thread, stop the UST tracing. However, this is
409 * not an internal error of the this thread thus setting
410 * the health error code to a normal exit.
411 */
412 err = 0;
413 goto error;
414 }
415
416 /*
417 * Update newly registered application with the tracing
418 * registry info already enabled information.
419 */
420 update_ust_app(app->sock);
421
422 /*
423 * Don't care about return value. Let the manage apps threads
424 * handle app unregistration upon socket close.
425 */
426 (void) ust_app_register_done(app);
427
428 /*
429 * Even if the application socket has been closed, send the app
430 * to the thread and unregistration will take place at that
431 * place.
432 */
433 ret = send_socket_to_thread(
434 notifiers->apps_cmd_pipe_write_fd,
435 app->sock);
436 if (ret < 0) {
437 rcu_read_unlock();
438 session_unlock_list();
439 /*
440 * No apps. thread, stop the UST tracing. However, this is
441 * not an internal error of the this thread thus setting
442 * the health error code to a normal exit.
443 */
444 err = 0;
445 goto error;
446 }
447
448 rcu_read_unlock();
449 session_unlock_list();
450 }
451 } while (node != NULL);
452
453 health_poll_entry();
454 /* Futex wait on queue. Blocking call on futex() */
455 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
456 health_poll_exit();
457 }
458 /* Normal exit, no error */
459 err = 0;
460
461error:
462 /* Clean up wait queue. */
463 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
464 &wait_queue.head, head) {
465 cds_list_del(&wait_node->head);
466 wait_queue.count--;
467 free(wait_node);
468 }
469
470 /* Empty command queue. */
471 for (;;) {
472 /* Dequeue command for registration */
473 node = cds_wfcq_dequeue_blocking(
474 &notifiers->ust_cmd_queue->head,
475 &notifiers->ust_cmd_queue->tail);
476 if (node == NULL) {
477 break;
478 }
479 ust_cmd = caa_container_of(node, struct ust_command, node);
480 ret = close(ust_cmd->sock);
481 if (ret < 0) {
482 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
483 }
484 lttng_fd_put(LTTNG_FD_APPS, 1);
485 free(ust_cmd);
486 }
487
488error_testpoint:
489 DBG("Dispatch thread dying");
490 if (err) {
491 health_error();
492 ERR("Health error occurred in %s", __func__);
493 }
494 health_unregister(health_sessiond);
495 rcu_unregister_thread();
496 return NULL;
497}
498
499static bool shutdown_ust_dispatch_thread(void *data)
500{
501 struct thread_notifiers *notifiers = data;
502
503 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
504 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
505 return true;
506}
507
508bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
509 int apps_cmd_pipe_write_fd,
510 int apps_cmd_notify_pipe_write_fd)
511{
512 struct lttng_thread *thread;
513 struct thread_notifiers *notifiers;
514
515 notifiers = zmalloc(sizeof(*notifiers));
516 if (!notifiers) {
517 goto error;
518 }
519 notifiers->ust_cmd_queue = cmd_queue;
520 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
521 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
522
523 thread = lttng_thread_create("UST registration dispatch",
524 thread_dispatch_ust_registration,
525 shutdown_ust_dispatch_thread,
526 cleanup_ust_dispatch_thread,
527 notifiers);
528 if (!thread) {
529 goto error;
530 }
531 lttng_thread_put(thread);
532 return true;
533error:
534 free(notifiers);
535 return false;
536}
This page took 0.045707 seconds and 4 git commands to generate.