consumerd: send a buffer static sample on flush command
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.cpp
CommitLineData
5d1b0219 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5d1b0219 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
5d1b0219 7 *
5d1b0219
JG
8 */
9
10#include <stddef.h>
11#include <stdlib.h>
12#include <urcu.h>
c9e313bc
SM
13#include <common/futex.hpp>
14#include <common/macros.hpp>
5d1b0219 15
c9e313bc
SM
16#include "dispatch.hpp"
17#include "ust-app.hpp"
18#include "testpoint.hpp"
19#include "fd-limit.hpp"
20#include "health-sessiond.hpp"
21#include "lttng-sessiond.hpp"
22#include "thread.hpp"
5d1b0219 23
f1494934 24namespace {
5d1b0219
JG
25struct thread_notifiers {
26 struct ust_cmd_queue *ust_cmd_queue;
27 int apps_cmd_pipe_write_fd;
28 int apps_cmd_notify_pipe_write_fd;
29 int dispatch_thread_exit;
30};
f1494934 31} /* namespace */
5d1b0219
JG
32
33/*
34 * For each tracing session, update newly registered apps. The session list
35 * lock MUST be acquired before calling this.
36 */
37static void update_ust_app(int app_sock)
38{
39 struct ltt_session *sess, *stmp;
40 const struct ltt_session_list *session_list = session_get_list();
70670472 41 struct ust_app *app;
5d1b0219
JG
42
43 /* Consumer is in an ERROR state. Stop any application update. */
412d7227 44 if (uatomic_read(&the_ust_consumerd_state) == CONSUMER_ERROR) {
5d1b0219
JG
45 /* Stop the update process since the consumer is dead. */
46 return;
47 }
48
70670472 49 rcu_read_lock();
a0377dfe 50 LTTNG_ASSERT(app_sock >= 0);
70670472
JR
51 app = ust_app_find_by_sock(app_sock);
52 if (app == NULL) {
53 /*
54 * Application can be unregistered before so
55 * this is possible hence simply stopping the
56 * update.
57 */
58 DBG3("UST app update failed to find app sock %d",
59 app_sock);
60 goto unlock_rcu;
61 }
62
63 /* Update all event notifiers for the app. */
64 ust_app_global_update_event_notifier_rules(app);
65
5d1b0219
JG
66 /* For all tracing session(s) */
67 cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) {
5d1b0219
JG
68 if (!session_get(sess)) {
69 continue;
70 }
71 session_lock(sess);
8f4456a8
JR
72 if (!sess->active || !sess->ust_session ||
73 !sess->ust_session->active) {
5d1b0219
JG
74 goto unlock_session;
75 }
76
5d1b0219 77 ust_app_global_update(sess->ust_session, app);
5d1b0219
JG
78 unlock_session:
79 session_unlock(sess);
80 session_put(sess);
81 }
70670472
JR
82
83unlock_rcu:
84 rcu_read_unlock();
5d1b0219
JG
85}
86
87/*
88 * Sanitize the wait queue of the dispatch registration thread meaning removing
89 * invalid nodes from it. This is to avoid memory leaks for the case the UST
90 * notify socket is never received.
91 */
92static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
93{
94 int ret, nb_fd = 0, i;
95 unsigned int fd_added = 0;
96 struct lttng_poll_event events;
97 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
98
a0377dfe 99 LTTNG_ASSERT(wait_queue);
5d1b0219
JG
100
101 lttng_poll_init(&events);
102
103 /* Just skip everything for an empty queue. */
104 if (!wait_queue->count) {
105 goto end;
106 }
107
108 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
109 if (ret < 0) {
110 goto error_create;
111 }
112
113 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
114 &wait_queue->head, head) {
a0377dfe 115 LTTNG_ASSERT(wait_node->app);
5d1b0219
JG
116 ret = lttng_poll_add(&events, wait_node->app->sock,
117 LPOLLHUP | LPOLLERR);
118 if (ret < 0) {
119 goto error;
120 }
121
122 fd_added = 1;
123 }
124
125 if (!fd_added) {
126 goto end;
127 }
128
129 /*
130 * Poll but don't block so we can quickly identify the faulty events and
131 * clean them afterwards from the wait queue.
132 */
133 ret = lttng_poll_wait(&events, 0);
134 if (ret < 0) {
135 goto error;
136 }
137 nb_fd = ret;
138
139 for (i = 0; i < nb_fd; i++) {
140 /* Get faulty FD. */
141 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
142 int pollfd = LTTNG_POLL_GETFD(&events, i);
143
5d1b0219
JG
144 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
145 &wait_queue->head, head) {
146 if (pollfd == wait_node->app->sock &&
147 (revents & (LPOLLHUP | LPOLLERR))) {
148 cds_list_del(&wait_node->head);
149 wait_queue->count--;
150 ust_app_destroy(wait_node->app);
151 free(wait_node);
152 /*
153 * Silence warning of use-after-free in
154 * cds_list_for_each_entry_safe which uses
155 * __typeof__(*wait_node).
156 */
157 wait_node = NULL;
158 break;
159 } else {
160 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
161 goto error;
162 }
163 }
164 }
165
166 if (nb_fd > 0) {
167 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
168 }
169
170end:
171 lttng_poll_clean(&events);
172 return;
173
174error:
175 lttng_poll_clean(&events);
176error_create:
177 ERR("Unable to sanitize wait queue");
178 return;
179}
180
181/*
182 * Send a socket to a thread This is called from the dispatch UST registration
183 * thread once all sockets are set for the application.
184 *
185 * The sock value can be invalid, we don't really care, the thread will handle
186 * it and make the necessary cleanup if so.
187 *
188 * On success, return 0 else a negative value being the errno message of the
189 * write().
190 */
191static int send_socket_to_thread(int fd, int sock)
192{
193 ssize_t ret;
194
195 /*
196 * It's possible that the FD is set as invalid with -1 concurrently just
197 * before calling this function being a shutdown state of the thread.
198 */
199 if (fd < 0) {
200 ret = -EBADF;
201 goto error;
202 }
203
204 ret = lttng_write(fd, &sock, sizeof(sock));
205 if (ret < sizeof(sock)) {
206 PERROR("write apps pipe %d", fd);
207 if (ret < 0) {
208 ret = -errno;
209 }
210 goto error;
211 }
212
213 /* All good. Don't send back the write positive ret value. */
214 ret = 0;
215error:
216 return (int) ret;
217}
218
219static void cleanup_ust_dispatch_thread(void *data)
220{
221 free(data);
222}
223
224/*
225 * Dispatch request from the registration threads to the application
226 * communication thread.
227 */
228static void *thread_dispatch_ust_registration(void *data)
229{
230 int ret, err = -1;
231 struct cds_wfcq_node *node;
232 struct ust_command *ust_cmd = NULL;
233 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
234 struct ust_reg_wait_queue wait_queue = {
235 .count = 0,
1c9a0b0e 236 .head = {},
5d1b0219 237 };
7966af57 238 struct thread_notifiers *notifiers = (thread_notifiers *) data;
5d1b0219
JG
239
240 rcu_register_thread();
241
412d7227
SM
242 health_register(the_health_sessiond,
243 HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
5d1b0219
JG
244
245 if (testpoint(sessiond_thread_app_reg_dispatch)) {
246 goto error_testpoint;
247 }
248
249 health_code_update();
250
251 CDS_INIT_LIST_HEAD(&wait_queue.head);
252
253 DBG("[thread] Dispatch UST command started");
254
255 for (;;) {
256 health_code_update();
257
258 /* Atomically prepare the queue futex */
259 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
260
261 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
262 break;
263 }
264
265 do {
266 struct ust_app *app = NULL;
267 ust_cmd = NULL;
268
269 /*
270 * Make sure we don't have node(s) that have hung up before receiving
271 * the notify socket. This is to clean the list in order to avoid
272 * memory leaks from notify socket that are never seen.
273 */
274 sanitize_wait_queue(&wait_queue);
275
276 health_code_update();
277 /* Dequeue command for registration */
278 node = cds_wfcq_dequeue_blocking(
279 &notifiers->ust_cmd_queue->head,
280 &notifiers->ust_cmd_queue->tail);
281 if (node == NULL) {
282 DBG("Woken up but nothing in the UST command queue");
283 /* Continue thread execution */
284 break;
285 }
286
0114db0e 287 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
5d1b0219
JG
288
289 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
290 " gid:%d sock:%d name:%s (version %d.%d)",
291 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
292 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
293 ust_cmd->sock, ust_cmd->reg_msg.name,
294 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
295
b623cb6a 296 if (ust_cmd->reg_msg.type == LTTNG_UST_CTL_SOCKET_CMD) {
64803277 297 wait_node = zmalloc<ust_reg_wait_node>();
5d1b0219
JG
298 if (!wait_node) {
299 PERROR("zmalloc wait_node dispatch");
300 ret = close(ust_cmd->sock);
301 if (ret < 0) {
302 PERROR("close ust sock dispatch %d", ust_cmd->sock);
303 }
304 lttng_fd_put(LTTNG_FD_APPS, 1);
305 free(ust_cmd);
58f835e1 306 ust_cmd = NULL;
5d1b0219
JG
307 goto error;
308 }
309 CDS_INIT_LIST_HEAD(&wait_node->head);
310
311 /* Create application object if socket is CMD. */
312 wait_node->app = ust_app_create(&ust_cmd->reg_msg,
313 ust_cmd->sock);
314 if (!wait_node->app) {
315 ret = close(ust_cmd->sock);
316 if (ret < 0) {
317 PERROR("close ust sock dispatch %d", ust_cmd->sock);
318 }
319 lttng_fd_put(LTTNG_FD_APPS, 1);
320 free(wait_node);
58f835e1 321 wait_node = NULL;
5d1b0219 322 free(ust_cmd);
58f835e1 323 ust_cmd = NULL;
5d1b0219
JG
324 continue;
325 }
326 /*
327 * Add application to the wait queue so we can set the notify
328 * socket before putting this object in the global ht.
329 */
330 cds_list_add(&wait_node->head, &wait_queue.head);
331 wait_queue.count++;
332
333 free(ust_cmd);
58f835e1 334 ust_cmd = NULL;
5d1b0219
JG
335 /*
336 * We have to continue here since we don't have the notify
337 * socket and the application MUST be added to the hash table
338 * only at that moment.
339 */
340 continue;
341 } else {
342 /*
343 * Look for the application in the local wait queue and set the
344 * notify socket if found.
345 */
346 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
347 &wait_queue.head, head) {
348 health_code_update();
349 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
350 wait_node->app->notify_sock = ust_cmd->sock;
351 cds_list_del(&wait_node->head);
352 wait_queue.count--;
353 app = wait_node->app;
354 free(wait_node);
58f835e1 355 wait_node = NULL;
5d1b0219
JG
356 DBG3("UST app notify socket %d is set", ust_cmd->sock);
357 break;
358 }
359 }
360
361 /*
362 * With no application at this stage the received socket is
363 * basically useless so close it before we free the cmd data
364 * structure for good.
365 */
366 if (!app) {
367 ret = close(ust_cmd->sock);
368 if (ret < 0) {
369 PERROR("close ust sock dispatch %d", ust_cmd->sock);
370 }
371 lttng_fd_put(LTTNG_FD_APPS, 1);
372 }
373 free(ust_cmd);
58f835e1 374 ust_cmd = NULL;
5d1b0219
JG
375 }
376
377 if (app) {
378 /*
379 * @session_lock_list
380 *
381 * Lock the global session list so from the register up to the
382 * registration done message, no thread can see the application
383 * and change its state.
384 */
385 session_lock_list();
386 rcu_read_lock();
387
388 /*
389 * Add application to the global hash table. This needs to be
390 * done before the update to the UST registry can locate the
391 * application.
392 */
393 ust_app_add(app);
394
395 /* Set app version. This call will print an error if needed. */
396 (void) ust_app_version(app);
397
da873412
JR
398 (void) ust_app_setup_event_notifier_group(app);
399
5d1b0219
JG
400 /* Send notify socket through the notify pipe. */
401 ret = send_socket_to_thread(
402 notifiers->apps_cmd_notify_pipe_write_fd,
403 app->notify_sock);
404 if (ret < 0) {
405 rcu_read_unlock();
406 session_unlock_list();
407 /*
408 * No notify thread, stop the UST tracing. However, this is
409 * not an internal error of the this thread thus setting
410 * the health error code to a normal exit.
411 */
412 err = 0;
413 goto error;
414 }
415
416 /*
417 * Update newly registered application with the tracing
418 * registry info already enabled information.
419 */
420 update_ust_app(app->sock);
421
422 /*
423 * Don't care about return value. Let the manage apps threads
424 * handle app unregistration upon socket close.
425 */
426 (void) ust_app_register_done(app);
427
428 /*
429 * Even if the application socket has been closed, send the app
430 * to the thread and unregistration will take place at that
431 * place.
432 */
433 ret = send_socket_to_thread(
434 notifiers->apps_cmd_pipe_write_fd,
435 app->sock);
436 if (ret < 0) {
437 rcu_read_unlock();
438 session_unlock_list();
439 /*
440 * No apps. thread, stop the UST tracing. However, this is
441 * not an internal error of the this thread thus setting
442 * the health error code to a normal exit.
443 */
444 err = 0;
445 goto error;
446 }
447
448 rcu_read_unlock();
449 session_unlock_list();
450 }
451 } while (node != NULL);
452
453 health_poll_entry();
454 /* Futex wait on queue. Blocking call on futex() */
455 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
456 health_poll_exit();
457 }
458 /* Normal exit, no error */
459 err = 0;
460
461error:
462 /* Clean up wait queue. */
463 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
464 &wait_queue.head, head) {
465 cds_list_del(&wait_node->head);
466 wait_queue.count--;
467 free(wait_node);
468 }
469
470 /* Empty command queue. */
471 for (;;) {
472 /* Dequeue command for registration */
473 node = cds_wfcq_dequeue_blocking(
474 &notifiers->ust_cmd_queue->head,
475 &notifiers->ust_cmd_queue->tail);
476 if (node == NULL) {
477 break;
478 }
0114db0e 479 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
5d1b0219
JG
480 ret = close(ust_cmd->sock);
481 if (ret < 0) {
482 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
483 }
484 lttng_fd_put(LTTNG_FD_APPS, 1);
485 free(ust_cmd);
486 }
487
488error_testpoint:
489 DBG("Dispatch thread dying");
490 if (err) {
491 health_error();
492 ERR("Health error occurred in %s", __func__);
493 }
412d7227 494 health_unregister(the_health_sessiond);
5d1b0219
JG
495 rcu_unregister_thread();
496 return NULL;
497}
498
499static bool shutdown_ust_dispatch_thread(void *data)
500{
7966af57 501 struct thread_notifiers *notifiers = (thread_notifiers *) data;
5d1b0219
JG
502
503 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
504 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
505 return true;
506}
507
508bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
509 int apps_cmd_pipe_write_fd,
510 int apps_cmd_notify_pipe_write_fd)
511{
512 struct lttng_thread *thread;
513 struct thread_notifiers *notifiers;
514
64803277 515 notifiers = zmalloc<thread_notifiers>();
5d1b0219
JG
516 if (!notifiers) {
517 goto error;
518 }
519 notifiers->ust_cmd_queue = cmd_queue;
520 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
521 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
522
523 thread = lttng_thread_create("UST registration dispatch",
524 thread_dispatch_ust_registration,
525 shutdown_ust_dispatch_thread,
526 cleanup_ust_dispatch_thread,
527 notifiers);
528 if (!thread) {
529 goto error;
530 }
531 lttng_thread_put(thread);
532 return true;
533error:
534 free(notifiers);
535 return false;
536}
This page took 0.069989 seconds and 4 git commands to generate.