Standardize quit pipes behavior
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #include <signal.h>
11
12 #include <common/pipe.hpp>
13 #include <common/utils.hpp>
14
15 #include "manage-consumer.hpp"
16 #include "testpoint.hpp"
17 #include "health-sessiond.hpp"
18 #include "utils.hpp"
19 #include "thread.hpp"
20 #include "ust-consumer.hpp"
21
22 namespace {
23 struct thread_notifiers {
24 struct lttng_pipe *quit_pipe;
25 struct consumer_data *consumer_data;
26 sem_t ready;
27 int initialization_result;
28 };
29 } /* namespace */
30
31 static void mark_thread_as_ready(struct thread_notifiers *notifiers)
32 {
33 DBG("Marking consumer management thread as ready");
34 notifiers->initialization_result = 0;
35 sem_post(&notifiers->ready);
36 }
37
38 static void mark_thread_intialization_as_failed(
39 struct thread_notifiers *notifiers)
40 {
41 ERR("Consumer management thread entering error state");
42 notifiers->initialization_result = -1;
43 sem_post(&notifiers->ready);
44 }
45
46 static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
47 {
48 DBG("Waiting for consumer management thread to be ready");
49 sem_wait(&notifiers->ready);
50 DBG("Consumer management thread is ready");
51 }
52
53 /*
54 * This thread manage the consumer error sent back to the session daemon.
55 */
56 static void *thread_consumer_management(void *data)
57 {
58 int sock = -1, i, ret, err = -1, should_quit = 0;
59 uint32_t nb_fd;
60 enum lttcomm_return_code code;
61 struct lttng_poll_event events;
62 struct thread_notifiers *notifiers = (thread_notifiers *) data;
63 struct consumer_data *consumer_data = notifiers->consumer_data;
64 const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
65 struct consumer_socket *cmd_socket_wrapper = NULL;
66
67 DBG("[thread] Manage consumer started");
68
69 rcu_register_thread();
70 rcu_thread_online();
71
72 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
73
74 health_code_update();
75
76 /*
77 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
78 * metadata_sock. Nothing more will be added to this poll set.
79 */
80 ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
81 if (ret < 0) {
82 mark_thread_intialization_as_failed(notifiers);
83 goto error_poll;
84 }
85
86 ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLERR);
87 if (ret < 0) {
88 mark_thread_intialization_as_failed(notifiers);
89 goto error;
90 }
91
92 /*
93 * The error socket here is already in a listening state which was done
94 * just before spawning this thread to avoid a race between the consumer
95 * daemon exec trying to connect and the listen() call.
96 */
97 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
98 if (ret < 0) {
99 mark_thread_intialization_as_failed(notifiers);
100 goto error;
101 }
102
103 health_code_update();
104
105 /* Infinite blocking call, waiting for transmission */
106 health_poll_entry();
107
108 if (testpoint(sessiond_thread_manage_consumer)) {
109 mark_thread_intialization_as_failed(notifiers);
110 goto error;
111 }
112
113 ret = lttng_poll_wait(&events, -1);
114 health_poll_exit();
115 if (ret < 0) {
116 mark_thread_intialization_as_failed(notifiers);
117 goto error;
118 }
119
120 nb_fd = ret;
121
122 for (i = 0; i < nb_fd; i++) {
123 /* Fetch once the poll data */
124 const auto revents = LTTNG_POLL_GETEV(&events, i);
125 const auto pollfd = LTTNG_POLL_GETFD(&events, i);
126
127 health_code_update();
128
129 /* Activity on thread quit pipe, exiting. */
130 if (pollfd == thread_quit_pipe_fd) {
131 DBG("Activity on thread quit pipe");
132 err = 0;
133 mark_thread_intialization_as_failed(notifiers);
134 goto exit;
135 } else if (pollfd == consumer_data->err_sock) {
136 /* Event on the registration socket */
137 if (revents & LPOLLIN) {
138 continue;
139 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
140 ERR("consumer err socket poll error");
141 mark_thread_intialization_as_failed(notifiers);
142 goto error;
143 } else {
144 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
145 mark_thread_intialization_as_failed(notifiers);
146 goto error;
147 }
148 }
149 }
150
151 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
152 if (sock < 0) {
153 mark_thread_intialization_as_failed(notifiers);
154 goto error;
155 }
156
157 /*
158 * Set the CLOEXEC flag. Return code is useless because either way, the
159 * show must go on.
160 */
161 (void) utils_set_fd_cloexec(sock);
162
163 health_code_update();
164
165 DBG2("Receiving code from consumer err_sock");
166
167 /* Getting status code from kconsumerd */
168 ret = lttcomm_recv_unix_sock(sock, &code,
169 sizeof(enum lttcomm_return_code));
170 if (ret <= 0) {
171 mark_thread_intialization_as_failed(notifiers);
172 goto error;
173 }
174
175 health_code_update();
176 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
177 ERR("consumer error when waiting for SOCK_READY : %s",
178 lttcomm_get_readable_code((lttcomm_return_code) -code));
179 mark_thread_intialization_as_failed(notifiers);
180 goto error;
181 }
182
183 /* Connect both command and metadata sockets. */
184 consumer_data->cmd_sock =
185 lttcomm_connect_unix_sock(
186 consumer_data->cmd_unix_sock_path);
187 consumer_data->metadata_fd =
188 lttcomm_connect_unix_sock(
189 consumer_data->cmd_unix_sock_path);
190 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
191 PERROR("consumer connect cmd socket");
192 mark_thread_intialization_as_failed(notifiers);
193 goto error;
194 }
195
196 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
197
198 /* Create metadata socket lock. */
199 consumer_data->metadata_sock.lock = zmalloc<pthread_mutex_t>();
200 if (consumer_data->metadata_sock.lock == NULL) {
201 PERROR("zmalloc pthread mutex");
202 mark_thread_intialization_as_failed(notifiers);
203 goto error;
204 }
205 pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
206
207 DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
208 DBG("Consumer metadata socket ready (fd: %d)",
209 consumer_data->metadata_fd);
210
211 /*
212 * Remove the consumerd error sock since we've established a connection.
213 */
214 ret = lttng_poll_del(&events, consumer_data->err_sock);
215 if (ret < 0) {
216 mark_thread_intialization_as_failed(notifiers);
217 goto error;
218 }
219
220 /* Add new accepted error socket. */
221 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
222 if (ret < 0) {
223 mark_thread_intialization_as_failed(notifiers);
224 goto error;
225 }
226
227 /* Add metadata socket that is successfully connected. */
228 ret = lttng_poll_add(&events, consumer_data->metadata_fd,
229 LPOLLIN | LPOLLRDHUP);
230 if (ret < 0) {
231 mark_thread_intialization_as_failed(notifiers);
232 goto error;
233 }
234
235 health_code_update();
236
237 /*
238 * Transfer the write-end of the channel monitoring pipe to the consumer
239 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
240 */
241 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
242 if (!cmd_socket_wrapper) {
243 mark_thread_intialization_as_failed(notifiers);
244 goto error;
245 }
246 cmd_socket_wrapper->lock = &consumer_data->lock;
247
248 pthread_mutex_lock(cmd_socket_wrapper->lock);
249 ret = consumer_init(cmd_socket_wrapper, the_sessiond_uuid);
250 if (ret) {
251 ERR("Failed to send sessiond uuid to consumer daemon");
252 mark_thread_intialization_as_failed(notifiers);
253 pthread_mutex_unlock(cmd_socket_wrapper->lock);
254 goto error;
255 }
256 pthread_mutex_unlock(cmd_socket_wrapper->lock);
257
258 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
259 consumer_data->channel_monitor_pipe);
260 if (ret) {
261 mark_thread_intialization_as_failed(notifiers);
262 goto error;
263 }
264
265 /* Discard the socket wrapper as it is no longer needed. */
266 consumer_destroy_socket(cmd_socket_wrapper);
267 cmd_socket_wrapper = NULL;
268
269 /* The thread is completely initialized, signal that it is ready. */
270 mark_thread_as_ready(notifiers);
271
272 /* Infinite blocking call, waiting for transmission */
273 while (1) {
274 health_code_update();
275
276 /* Exit the thread because the thread quit pipe has been triggered. */
277 if (should_quit) {
278 /* Not a health error. */
279 err = 0;
280 goto exit;
281 }
282
283 health_poll_entry();
284 ret = lttng_poll_wait(&events, -1);
285 health_poll_exit();
286 if (ret < 0) {
287 goto error;
288 }
289
290 nb_fd = ret;
291
292 for (i = 0; i < nb_fd; i++) {
293 /* Fetch once the poll data */
294 const auto revents = LTTNG_POLL_GETEV(&events, i);
295 const auto pollfd = LTTNG_POLL_GETFD(&events, i);
296
297 health_code_update();
298
299 /*
300 * Thread quit pipe has been triggered, flag that we should stop
301 * but continue the current loop to handle potential data from
302 * consumer.
303 */
304 if (pollfd == thread_quit_pipe_fd) {
305 should_quit = 1;
306 } else if (pollfd == sock) {
307 /* Event on the consumerd socket */
308 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
309 && !(revents & LPOLLIN)) {
310 ERR("consumer err socket second poll error");
311 goto error;
312 }
313 health_code_update();
314 /* Wait for any kconsumerd error */
315 ret = lttcomm_recv_unix_sock(sock, &code,
316 sizeof(enum lttcomm_return_code));
317 if (ret <= 0) {
318 ERR("consumer closed the command socket");
319 goto error;
320 }
321
322 ERR("consumer return code : %s",
323 lttcomm_get_readable_code((lttcomm_return_code) -code));
324
325 goto exit;
326 } else if (pollfd == consumer_data->metadata_fd) {
327 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
328 && !(revents & LPOLLIN)) {
329 ERR("consumer err metadata socket second poll error");
330 goto error;
331 }
332 /* UST metadata requests */
333 ret = ust_consumer_metadata_request(
334 &consumer_data->metadata_sock);
335 if (ret < 0) {
336 ERR("Handling metadata request");
337 goto error;
338 }
339 }
340 /* No need for an else branch all FDs are tested prior. */
341 }
342 health_code_update();
343 }
344
345 exit:
346 error:
347 /*
348 * We lock here because we are about to close the sockets and some other
349 * thread might be using them so get exclusive access which will abort all
350 * other consumer command by other threads.
351 */
352 pthread_mutex_lock(&consumer_data->lock);
353
354 /* Immediately set the consumerd state to stopped */
355 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
356 uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR);
357 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
358 consumer_data->type == LTTNG_CONSUMER32_UST) {
359 uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR);
360 } else {
361 /* Code flow error... */
362 abort();
363 }
364
365 if (consumer_data->err_sock >= 0) {
366 ret = close(consumer_data->err_sock);
367 if (ret) {
368 PERROR("close");
369 }
370 consumer_data->err_sock = -1;
371 }
372 if (consumer_data->cmd_sock >= 0) {
373 ret = close(consumer_data->cmd_sock);
374 if (ret) {
375 PERROR("close");
376 }
377 consumer_data->cmd_sock = -1;
378 }
379 if (consumer_data->metadata_sock.fd_ptr &&
380 *consumer_data->metadata_sock.fd_ptr >= 0) {
381 ret = close(*consumer_data->metadata_sock.fd_ptr);
382 if (ret) {
383 PERROR("close");
384 }
385 }
386 if (sock >= 0) {
387 ret = close(sock);
388 if (ret) {
389 PERROR("close");
390 }
391 }
392
393 unlink(consumer_data->err_unix_sock_path);
394 unlink(consumer_data->cmd_unix_sock_path);
395 pthread_mutex_unlock(&consumer_data->lock);
396
397 /* Cleanup metadata socket mutex. */
398 if (consumer_data->metadata_sock.lock) {
399 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
400 free(consumer_data->metadata_sock.lock);
401 }
402 lttng_poll_clean(&events);
403
404 if (cmd_socket_wrapper) {
405 consumer_destroy_socket(cmd_socket_wrapper);
406 }
407 error_poll:
408 if (err) {
409 health_error();
410 ERR("Health error occurred in %s", __func__);
411 }
412 health_unregister(the_health_sessiond);
413 DBG("consumer thread cleanup completed");
414
415 rcu_thread_offline();
416 rcu_unregister_thread();
417
418 return NULL;
419 }
420
421 static bool shutdown_consumer_management_thread(void *data)
422 {
423 struct thread_notifiers *notifiers = (thread_notifiers *) data;
424 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
425
426 return notify_thread_pipe(write_fd) == 1;
427 }
428
429 static void cleanup_consumer_management_thread(void *data)
430 {
431 struct thread_notifiers *notifiers = (thread_notifiers *) data;
432
433 lttng_pipe_destroy(notifiers->quit_pipe);
434 free(notifiers);
435 }
436
437 bool launch_consumer_management_thread(struct consumer_data *consumer_data)
438 {
439 struct lttng_pipe *quit_pipe;
440 struct thread_notifiers *notifiers = NULL;
441 struct lttng_thread *thread;
442
443 notifiers = zmalloc<thread_notifiers>();
444 if (!notifiers) {
445 goto error_alloc;
446 }
447
448 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
449 if (!quit_pipe) {
450 goto error;
451 }
452 notifiers->quit_pipe = quit_pipe;
453 notifiers->consumer_data = consumer_data;
454 sem_init(&notifiers->ready, 0, 0);
455
456 thread = lttng_thread_create("Consumer management",
457 thread_consumer_management,
458 shutdown_consumer_management_thread,
459 cleanup_consumer_management_thread,
460 notifiers);
461 if (!thread) {
462 goto error;
463 }
464 wait_until_thread_is_ready(notifiers);
465 lttng_thread_put(thread);
466 if (notifiers->initialization_result) {
467 return false;
468 }
469 return true;
470 error:
471 cleanup_consumer_management_thread(notifiers);
472 error_alloc:
473 return false;
474 }
This page took 0.038612 seconds and 4 git commands to generate.