Commit | Line | Data |
---|---|---|
5d1b0219 | 1 | /* |
21cf9b6b | 2 | * Copyright (C) 2011 EfficiOS Inc. |
ab5be9fa MJ |
3 | * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com> |
4 | * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
5d1b0219 | 5 | * |
ab5be9fa | 6 | * SPDX-License-Identifier: GPL-2.0-only |
5d1b0219 | 7 | * |
5d1b0219 JG |
8 | */ |
9 | ||
c9e313bc | 10 | #include "dispatch.hpp" |
c9e313bc SM |
11 | #include "fd-limit.hpp" |
12 | #include "health-sessiond.hpp" | |
13 | #include "lttng-sessiond.hpp" | |
28ab034a | 14 | #include "testpoint.hpp" |
c9e313bc | 15 | #include "thread.hpp" |
28ab034a JG |
16 | #include "ust-app.hpp" |
17 | ||
18 | #include <common/futex.hpp> | |
19 | #include <common/macros.hpp> | |
56047f5a | 20 | #include <common/urcu.hpp> |
28ab034a JG |
21 | |
22 | #include <stddef.h> | |
23 | #include <stdlib.h> | |
24 | #include <urcu.h> | |
5d1b0219 | 25 | |
f1494934 | 26 | namespace { |
5d1b0219 JG |
27 | struct thread_notifiers { |
28 | struct ust_cmd_queue *ust_cmd_queue; | |
29 | int apps_cmd_pipe_write_fd; | |
30 | int apps_cmd_notify_pipe_write_fd; | |
31 | int dispatch_thread_exit; | |
32 | }; | |
f1494934 | 33 | } /* namespace */ |
5d1b0219 JG |
34 | |
35 | /* | |
36 | * For each tracing session, update newly registered apps. The session list | |
37 | * lock MUST be acquired before calling this. | |
38 | */ | |
39 | static void update_ust_app(int app_sock) | |
40 | { | |
41 | struct ltt_session *sess, *stmp; | |
42 | const struct ltt_session_list *session_list = session_get_list(); | |
70670472 | 43 | struct ust_app *app; |
5d1b0219 JG |
44 | |
45 | /* Consumer is in an ERROR state. Stop any application update. */ | |
412d7227 | 46 | if (uatomic_read(&the_ust_consumerd_state) == CONSUMER_ERROR) { |
5d1b0219 JG |
47 | /* Stop the update process since the consumer is dead. */ |
48 | return; | |
49 | } | |
50 | ||
56047f5a | 51 | lttng::urcu::read_lock_guard read_lock; |
a0377dfe | 52 | LTTNG_ASSERT(app_sock >= 0); |
70670472 | 53 | app = ust_app_find_by_sock(app_sock); |
cd9adb8b | 54 | if (app == nullptr) { |
70670472 JR |
55 | /* |
56 | * Application can be unregistered before so | |
57 | * this is possible hence simply stopping the | |
58 | * update. | |
59 | */ | |
28ab034a | 60 | DBG3("UST app update failed to find app sock %d", app_sock); |
56047f5a | 61 | return; |
70670472 JR |
62 | } |
63 | ||
64 | /* Update all event notifiers for the app. */ | |
65 | ust_app_global_update_event_notifier_rules(app); | |
66 | ||
5d1b0219 | 67 | /* For all tracing session(s) */ |
28ab034a | 68 | cds_list_for_each_entry_safe (sess, stmp, &session_list->head, list) { |
5d1b0219 JG |
69 | if (!session_get(sess)) { |
70 | continue; | |
71 | } | |
72 | session_lock(sess); | |
28ab034a | 73 | if (!sess->active || !sess->ust_session || !sess->ust_session->active) { |
5d1b0219 JG |
74 | goto unlock_session; |
75 | } | |
76 | ||
5d1b0219 | 77 | ust_app_global_update(sess->ust_session, app); |
5d1b0219 JG |
78 | unlock_session: |
79 | session_unlock(sess); | |
80 | session_put(sess); | |
81 | } | |
82 | } | |
83 | ||
84 | /* | |
85 | * Sanitize the wait queue of the dispatch registration thread meaning removing | |
86 | * invalid nodes from it. This is to avoid memory leaks for the case the UST | |
87 | * notify socket is never received. | |
88 | */ | |
89 | static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) | |
90 | { | |
91 | int ret, nb_fd = 0, i; | |
92 | unsigned int fd_added = 0; | |
93 | struct lttng_poll_event events; | |
cd9adb8b | 94 | struct ust_reg_wait_node *wait_node = nullptr, *tmp_wait_node; |
5d1b0219 | 95 | |
a0377dfe | 96 | LTTNG_ASSERT(wait_queue); |
5d1b0219 JG |
97 | |
98 | lttng_poll_init(&events); | |
99 | ||
100 | /* Just skip everything for an empty queue. */ | |
101 | if (!wait_queue->count) { | |
102 | goto end; | |
103 | } | |
104 | ||
105 | ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC); | |
106 | if (ret < 0) { | |
107 | goto error_create; | |
108 | } | |
109 | ||
28ab034a | 110 | cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue->head, head) { |
a0377dfe | 111 | LTTNG_ASSERT(wait_node->app); |
1524f98c | 112 | ret = lttng_poll_add(&events, wait_node->app->sock, LPOLLIN); |
5d1b0219 JG |
113 | if (ret < 0) { |
114 | goto error; | |
115 | } | |
116 | ||
117 | fd_added = 1; | |
118 | } | |
119 | ||
120 | if (!fd_added) { | |
121 | goto end; | |
122 | } | |
123 | ||
124 | /* | |
125 | * Poll but don't block so we can quickly identify the faulty events and | |
126 | * clean them afterwards from the wait queue. | |
127 | */ | |
128 | ret = lttng_poll_wait(&events, 0); | |
129 | if (ret < 0) { | |
130 | goto error; | |
131 | } | |
132 | nb_fd = ret; | |
133 | ||
134 | for (i = 0; i < nb_fd; i++) { | |
135 | /* Get faulty FD. */ | |
136 | uint32_t revents = LTTNG_POLL_GETEV(&events, i); | |
137 | int pollfd = LTTNG_POLL_GETFD(&events, i); | |
138 | ||
28ab034a JG |
139 | cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue->head, head) { |
140 | if (pollfd == wait_node->app->sock && (revents & (LPOLLHUP | LPOLLERR))) { | |
5d1b0219 JG |
141 | cds_list_del(&wait_node->head); |
142 | wait_queue->count--; | |
a7db814e | 143 | ust_app_put(wait_node->app); |
5d1b0219 | 144 | free(wait_node); |
a7db814e | 145 | |
5d1b0219 JG |
146 | /* |
147 | * Silence warning of use-after-free in | |
148 | * cds_list_for_each_entry_safe which uses | |
149 | * __typeof__(*wait_node). | |
150 | */ | |
cd9adb8b | 151 | wait_node = nullptr; |
5d1b0219 JG |
152 | break; |
153 | } else { | |
154 | ERR("Unexpected poll events %u for sock %d", revents, pollfd); | |
155 | goto error; | |
156 | } | |
157 | } | |
158 | } | |
159 | ||
160 | if (nb_fd > 0) { | |
161 | DBG("Wait queue sanitized, %d node were cleaned up", nb_fd); | |
162 | } | |
163 | ||
164 | end: | |
165 | lttng_poll_clean(&events); | |
166 | return; | |
167 | ||
168 | error: | |
169 | lttng_poll_clean(&events); | |
170 | error_create: | |
171 | ERR("Unable to sanitize wait queue"); | |
172 | return; | |
173 | } | |
174 | ||
175 | /* | |
176 | * Send a socket to a thread This is called from the dispatch UST registration | |
177 | * thread once all sockets are set for the application. | |
178 | * | |
179 | * The sock value can be invalid, we don't really care, the thread will handle | |
180 | * it and make the necessary cleanup if so. | |
181 | * | |
182 | * On success, return 0 else a negative value being the errno message of the | |
183 | * write(). | |
184 | */ | |
185 | static int send_socket_to_thread(int fd, int sock) | |
186 | { | |
187 | ssize_t ret; | |
188 | ||
189 | /* | |
190 | * It's possible that the FD is set as invalid with -1 concurrently just | |
191 | * before calling this function being a shutdown state of the thread. | |
192 | */ | |
193 | if (fd < 0) { | |
194 | ret = -EBADF; | |
195 | goto error; | |
196 | } | |
197 | ||
198 | ret = lttng_write(fd, &sock, sizeof(sock)); | |
199 | if (ret < sizeof(sock)) { | |
200 | PERROR("write apps pipe %d", fd); | |
201 | if (ret < 0) { | |
202 | ret = -errno; | |
203 | } | |
204 | goto error; | |
205 | } | |
206 | ||
207 | /* All good. Don't send back the write positive ret value. */ | |
208 | ret = 0; | |
209 | error: | |
210 | return (int) ret; | |
211 | } | |
212 | ||
213 | static void cleanup_ust_dispatch_thread(void *data) | |
214 | { | |
215 | free(data); | |
216 | } | |
217 | ||
218 | /* | |
219 | * Dispatch request from the registration threads to the application | |
220 | * communication thread. | |
221 | */ | |
222 | static void *thread_dispatch_ust_registration(void *data) | |
223 | { | |
224 | int ret, err = -1; | |
225 | struct cds_wfcq_node *node; | |
cd9adb8b JG |
226 | struct ust_command *ust_cmd = nullptr; |
227 | struct ust_reg_wait_node *wait_node = nullptr, *tmp_wait_node; | |
5d1b0219 JG |
228 | struct ust_reg_wait_queue wait_queue = { |
229 | .count = 0, | |
1c9a0b0e | 230 | .head = {}, |
5d1b0219 | 231 | }; |
7966af57 | 232 | struct thread_notifiers *notifiers = (thread_notifiers *) data; |
5d1b0219 JG |
233 | |
234 | rcu_register_thread(); | |
235 | ||
28ab034a | 236 | health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH); |
5d1b0219 JG |
237 | |
238 | if (testpoint(sessiond_thread_app_reg_dispatch)) { | |
239 | goto error_testpoint; | |
240 | } | |
241 | ||
242 | health_code_update(); | |
243 | ||
244 | CDS_INIT_LIST_HEAD(&wait_queue.head); | |
245 | ||
246 | DBG("[thread] Dispatch UST command started"); | |
247 | ||
248 | for (;;) { | |
249 | health_code_update(); | |
250 | ||
251 | /* Atomically prepare the queue futex */ | |
252 | futex_nto1_prepare(¬ifiers->ust_cmd_queue->futex); | |
253 | ||
254 | if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) { | |
255 | break; | |
256 | } | |
257 | ||
258 | do { | |
cd9adb8b JG |
259 | struct ust_app *app = nullptr; |
260 | ust_cmd = nullptr; | |
5d1b0219 JG |
261 | |
262 | /* | |
263 | * Make sure we don't have node(s) that have hung up before receiving | |
264 | * the notify socket. This is to clean the list in order to avoid | |
265 | * memory leaks from notify socket that are never seen. | |
266 | */ | |
267 | sanitize_wait_queue(&wait_queue); | |
268 | ||
269 | health_code_update(); | |
270 | /* Dequeue command for registration */ | |
28ab034a JG |
271 | node = cds_wfcq_dequeue_blocking(¬ifiers->ust_cmd_queue->head, |
272 | ¬ifiers->ust_cmd_queue->tail); | |
cd9adb8b | 273 | if (node == nullptr) { |
5d1b0219 JG |
274 | DBG("Woken up but nothing in the UST command queue"); |
275 | /* Continue thread execution */ | |
276 | break; | |
277 | } | |
278 | ||
0114db0e | 279 | ust_cmd = lttng::utils::container_of(node, &ust_command::node); |
5d1b0219 JG |
280 | |
281 | DBG("Dispatching UST registration pid:%d ppid:%d uid:%d" | |
28ab034a JG |
282 | " gid:%d sock:%d name:%s (version %d.%d)", |
283 | ust_cmd->reg_msg.pid, | |
284 | ust_cmd->reg_msg.ppid, | |
285 | ust_cmd->reg_msg.uid, | |
286 | ust_cmd->reg_msg.gid, | |
287 | ust_cmd->sock, | |
288 | ust_cmd->reg_msg.name, | |
289 | ust_cmd->reg_msg.major, | |
290 | ust_cmd->reg_msg.minor); | |
5d1b0219 | 291 | |
b623cb6a | 292 | if (ust_cmd->reg_msg.type == LTTNG_UST_CTL_SOCKET_CMD) { |
64803277 | 293 | wait_node = zmalloc<ust_reg_wait_node>(); |
5d1b0219 JG |
294 | if (!wait_node) { |
295 | PERROR("zmalloc wait_node dispatch"); | |
296 | ret = close(ust_cmd->sock); | |
297 | if (ret < 0) { | |
298 | PERROR("close ust sock dispatch %d", ust_cmd->sock); | |
299 | } | |
300 | lttng_fd_put(LTTNG_FD_APPS, 1); | |
301 | free(ust_cmd); | |
cd9adb8b | 302 | ust_cmd = nullptr; |
5d1b0219 JG |
303 | goto error; |
304 | } | |
305 | CDS_INIT_LIST_HEAD(&wait_node->head); | |
306 | ||
307 | /* Create application object if socket is CMD. */ | |
28ab034a | 308 | wait_node->app = ust_app_create(&ust_cmd->reg_msg, ust_cmd->sock); |
5d1b0219 JG |
309 | if (!wait_node->app) { |
310 | ret = close(ust_cmd->sock); | |
311 | if (ret < 0) { | |
312 | PERROR("close ust sock dispatch %d", ust_cmd->sock); | |
313 | } | |
314 | lttng_fd_put(LTTNG_FD_APPS, 1); | |
315 | free(wait_node); | |
cd9adb8b | 316 | wait_node = nullptr; |
5d1b0219 | 317 | free(ust_cmd); |
cd9adb8b | 318 | ust_cmd = nullptr; |
5d1b0219 JG |
319 | continue; |
320 | } | |
321 | /* | |
322 | * Add application to the wait queue so we can set the notify | |
323 | * socket before putting this object in the global ht. | |
324 | */ | |
325 | cds_list_add(&wait_node->head, &wait_queue.head); | |
326 | wait_queue.count++; | |
327 | ||
328 | free(ust_cmd); | |
cd9adb8b | 329 | ust_cmd = nullptr; |
5d1b0219 JG |
330 | /* |
331 | * We have to continue here since we don't have the notify | |
332 | * socket and the application MUST be added to the hash table | |
333 | * only at that moment. | |
334 | */ | |
335 | continue; | |
336 | } else { | |
337 | /* | |
338 | * Look for the application in the local wait queue and set the | |
339 | * notify socket if found. | |
340 | */ | |
28ab034a JG |
341 | cds_list_for_each_entry_safe ( |
342 | wait_node, tmp_wait_node, &wait_queue.head, head) { | |
5d1b0219 JG |
343 | health_code_update(); |
344 | if (wait_node->app->pid == ust_cmd->reg_msg.pid) { | |
345 | wait_node->app->notify_sock = ust_cmd->sock; | |
346 | cds_list_del(&wait_node->head); | |
347 | wait_queue.count--; | |
348 | app = wait_node->app; | |
349 | free(wait_node); | |
cd9adb8b | 350 | wait_node = nullptr; |
28ab034a JG |
351 | DBG3("UST app notify socket %d is set", |
352 | ust_cmd->sock); | |
5d1b0219 JG |
353 | break; |
354 | } | |
355 | } | |
356 | ||
357 | /* | |
358 | * With no application at this stage the received socket is | |
359 | * basically useless so close it before we free the cmd data | |
360 | * structure for good. | |
361 | */ | |
362 | if (!app) { | |
363 | ret = close(ust_cmd->sock); | |
364 | if (ret < 0) { | |
365 | PERROR("close ust sock dispatch %d", ust_cmd->sock); | |
366 | } | |
367 | lttng_fd_put(LTTNG_FD_APPS, 1); | |
368 | } | |
369 | free(ust_cmd); | |
cd9adb8b | 370 | ust_cmd = nullptr; |
5d1b0219 JG |
371 | } |
372 | ||
373 | if (app) { | |
374 | /* | |
375 | * @session_lock_list | |
376 | * | |
377 | * Lock the global session list so from the register up to the | |
378 | * registration done message, no thread can see the application | |
379 | * and change its state. | |
380 | */ | |
381 | session_lock_list(); | |
56047f5a | 382 | lttng::urcu::read_lock_guard read_lock; |
5d1b0219 JG |
383 | |
384 | /* | |
385 | * Add application to the global hash table. This needs to be | |
386 | * done before the update to the UST registry can locate the | |
387 | * application. | |
388 | */ | |
389 | ust_app_add(app); | |
390 | ||
391 | /* Set app version. This call will print an error if needed. */ | |
392 | (void) ust_app_version(app); | |
393 | ||
da873412 JR |
394 | (void) ust_app_setup_event_notifier_group(app); |
395 | ||
5d1b0219 JG |
396 | /* Send notify socket through the notify pipe. */ |
397 | ret = send_socket_to_thread( | |
28ab034a | 398 | notifiers->apps_cmd_notify_pipe_write_fd, app->notify_sock); |
5d1b0219 | 399 | if (ret < 0) { |
5d1b0219 JG |
400 | session_unlock_list(); |
401 | /* | |
402 | * No notify thread, stop the UST tracing. However, this is | |
403 | * not an internal error of the this thread thus setting | |
404 | * the health error code to a normal exit. | |
405 | */ | |
406 | err = 0; | |
407 | goto error; | |
408 | } | |
409 | ||
410 | /* | |
411 | * Update newly registered application with the tracing | |
412 | * registry info already enabled information. | |
413 | */ | |
414 | update_ust_app(app->sock); | |
415 | ||
416 | /* | |
417 | * Don't care about return value. Let the manage apps threads | |
418 | * handle app unregistration upon socket close. | |
419 | */ | |
420 | (void) ust_app_register_done(app); | |
421 | ||
422 | /* | |
423 | * Even if the application socket has been closed, send the app | |
424 | * to the thread and unregistration will take place at that | |
425 | * place. | |
426 | */ | |
28ab034a JG |
427 | ret = send_socket_to_thread(notifiers->apps_cmd_pipe_write_fd, |
428 | app->sock); | |
5d1b0219 | 429 | if (ret < 0) { |
5d1b0219 JG |
430 | session_unlock_list(); |
431 | /* | |
432 | * No apps. thread, stop the UST tracing. However, this is | |
433 | * not an internal error of the this thread thus setting | |
434 | * the health error code to a normal exit. | |
435 | */ | |
436 | err = 0; | |
437 | goto error; | |
438 | } | |
439 | ||
5d1b0219 JG |
440 | session_unlock_list(); |
441 | } | |
cd9adb8b | 442 | } while (node != nullptr); |
5d1b0219 JG |
443 | |
444 | health_poll_entry(); | |
445 | /* Futex wait on queue. Blocking call on futex() */ | |
446 | futex_nto1_wait(¬ifiers->ust_cmd_queue->futex); | |
447 | health_poll_exit(); | |
448 | } | |
449 | /* Normal exit, no error */ | |
450 | err = 0; | |
451 | ||
452 | error: | |
453 | /* Clean up wait queue. */ | |
28ab034a | 454 | cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue.head, head) { |
5d1b0219 JG |
455 | cds_list_del(&wait_node->head); |
456 | wait_queue.count--; | |
457 | free(wait_node); | |
458 | } | |
459 | ||
460 | /* Empty command queue. */ | |
461 | for (;;) { | |
462 | /* Dequeue command for registration */ | |
28ab034a JG |
463 | node = cds_wfcq_dequeue_blocking(¬ifiers->ust_cmd_queue->head, |
464 | ¬ifiers->ust_cmd_queue->tail); | |
cd9adb8b | 465 | if (node == nullptr) { |
5d1b0219 JG |
466 | break; |
467 | } | |
0114db0e | 468 | ust_cmd = lttng::utils::container_of(node, &ust_command::node); |
5d1b0219 JG |
469 | ret = close(ust_cmd->sock); |
470 | if (ret < 0) { | |
471 | PERROR("close ust sock exit dispatch %d", ust_cmd->sock); | |
472 | } | |
473 | lttng_fd_put(LTTNG_FD_APPS, 1); | |
474 | free(ust_cmd); | |
475 | } | |
476 | ||
477 | error_testpoint: | |
478 | DBG("Dispatch thread dying"); | |
479 | if (err) { | |
480 | health_error(); | |
481 | ERR("Health error occurred in %s", __func__); | |
482 | } | |
412d7227 | 483 | health_unregister(the_health_sessiond); |
5d1b0219 | 484 | rcu_unregister_thread(); |
cd9adb8b | 485 | return nullptr; |
5d1b0219 JG |
486 | } |
487 | ||
488 | static bool shutdown_ust_dispatch_thread(void *data) | |
489 | { | |
7966af57 | 490 | struct thread_notifiers *notifiers = (thread_notifiers *) data; |
5d1b0219 JG |
491 | |
492 | CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1); | |
493 | futex_nto1_wake(¬ifiers->ust_cmd_queue->futex); | |
494 | return true; | |
495 | } | |
496 | ||
497 | bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue, | |
28ab034a JG |
498 | int apps_cmd_pipe_write_fd, |
499 | int apps_cmd_notify_pipe_write_fd) | |
5d1b0219 JG |
500 | { |
501 | struct lttng_thread *thread; | |
502 | struct thread_notifiers *notifiers; | |
503 | ||
64803277 | 504 | notifiers = zmalloc<thread_notifiers>(); |
5d1b0219 JG |
505 | if (!notifiers) { |
506 | goto error; | |
507 | } | |
508 | notifiers->ust_cmd_queue = cmd_queue; | |
509 | notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd; | |
510 | notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd; | |
511 | ||
512 | thread = lttng_thread_create("UST registration dispatch", | |
28ab034a JG |
513 | thread_dispatch_ust_registration, |
514 | shutdown_ust_dispatch_thread, | |
515 | cleanup_ust_dispatch_thread, | |
516 | notifiers); | |
5d1b0219 JG |
517 | if (!thread) { |
518 | goto error; | |
519 | } | |
520 | lttng_thread_put(thread); | |
521 | return true; | |
522 | error: | |
523 | free(notifiers); | |
524 | return false; | |
525 | } |