2 * Copyright (C) 2011 David Goulet <david.goulet@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * SPDX-License-Identifier: GPL-2.0-only
12 #include <common/pipe.h>
13 #include <common/utils.h>
15 #include "manage-consumer.h"
16 #include "testpoint.h"
17 #include "health-sessiond.h"
20 #include "ust-consumer.h"
22 struct thread_notifiers
{
23 struct lttng_pipe
*quit_pipe
;
24 struct consumer_data
*consumer_data
;
26 int initialization_result
;
29 static void mark_thread_as_ready(struct thread_notifiers
*notifiers
)
31 DBG("Marking consumer management thread as ready");
32 notifiers
->initialization_result
= 0;
33 sem_post(¬ifiers
->ready
);
36 static void mark_thread_intialization_as_failed(
37 struct thread_notifiers
*notifiers
)
39 ERR("Consumer management thread entering error state");
40 notifiers
->initialization_result
= -1;
41 sem_post(¬ifiers
->ready
);
44 static void wait_until_thread_is_ready(struct thread_notifiers
*notifiers
)
46 DBG("Waiting for consumer management thread to be ready");
47 sem_wait(¬ifiers
->ready
);
48 DBG("Consumer management thread is ready");
52 * This thread manage the consumer error sent back to the session daemon.
54 static void *thread_consumer_management(void *data
)
56 int sock
= -1, i
, ret
, pollfd
, err
= -1, should_quit
= 0;
57 uint32_t revents
, nb_fd
;
58 enum lttcomm_return_code code
;
59 struct lttng_poll_event events
;
60 struct thread_notifiers
*notifiers
= (thread_notifiers
*) data
;
61 struct consumer_data
*consumer_data
= notifiers
->consumer_data
;
62 const int quit_pipe_read_fd
= lttng_pipe_get_readfd(notifiers
->quit_pipe
);
63 struct consumer_socket
*cmd_socket_wrapper
= NULL
;
65 DBG("[thread] Manage consumer started");
67 rcu_register_thread();
70 health_register(the_health_sessiond
, HEALTH_SESSIOND_TYPE_CONSUMER
);
75 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
76 * metadata_sock. Nothing more will be added to this poll set.
78 ret
= lttng_poll_create(&events
, 3, LTTNG_CLOEXEC
);
80 mark_thread_intialization_as_failed(notifiers
);
84 ret
= lttng_poll_add(&events
, quit_pipe_read_fd
, LPOLLIN
| LPOLLERR
);
86 mark_thread_intialization_as_failed(notifiers
);
91 * The error socket here is already in a listening state which was done
92 * just before spawning this thread to avoid a race between the consumer
93 * daemon exec trying to connect and the listen() call.
95 ret
= lttng_poll_add(&events
, consumer_data
->err_sock
, LPOLLIN
| LPOLLRDHUP
);
97 mark_thread_intialization_as_failed(notifiers
);
101 health_code_update();
103 /* Infinite blocking call, waiting for transmission */
106 if (testpoint(sessiond_thread_manage_consumer
)) {
107 mark_thread_intialization_as_failed(notifiers
);
111 ret
= lttng_poll_wait(&events
, -1);
114 mark_thread_intialization_as_failed(notifiers
);
120 for (i
= 0; i
< nb_fd
; i
++) {
121 /* Fetch once the poll data */
122 revents
= LTTNG_POLL_GETEV(&events
, i
);
123 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
125 health_code_update();
127 /* Thread quit pipe has been closed. Killing thread. */
128 if (pollfd
== quit_pipe_read_fd
) {
130 mark_thread_intialization_as_failed(notifiers
);
132 } else if (pollfd
== consumer_data
->err_sock
) {
133 /* Event on the registration socket */
134 if (revents
& LPOLLIN
) {
136 } else if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
137 ERR("consumer err socket poll error");
138 mark_thread_intialization_as_failed(notifiers
);
141 ERR("Unexpected poll events %u for sock %d", revents
, pollfd
);
142 mark_thread_intialization_as_failed(notifiers
);
148 sock
= lttcomm_accept_unix_sock(consumer_data
->err_sock
);
150 mark_thread_intialization_as_failed(notifiers
);
155 * Set the CLOEXEC flag. Return code is useless because either way, the
158 (void) utils_set_fd_cloexec(sock
);
160 health_code_update();
162 DBG2("Receiving code from consumer err_sock");
164 /* Getting status code from kconsumerd */
165 ret
= lttcomm_recv_unix_sock(sock
, &code
,
166 sizeof(enum lttcomm_return_code
));
168 mark_thread_intialization_as_failed(notifiers
);
172 health_code_update();
173 if (code
!= LTTCOMM_CONSUMERD_COMMAND_SOCK_READY
) {
174 ERR("consumer error when waiting for SOCK_READY : %s",
175 lttcomm_get_readable_code((lttcomm_return_code
) -code
));
176 mark_thread_intialization_as_failed(notifiers
);
180 /* Connect both command and metadata sockets. */
181 consumer_data
->cmd_sock
=
182 lttcomm_connect_unix_sock(
183 consumer_data
->cmd_unix_sock_path
);
184 consumer_data
->metadata_fd
=
185 lttcomm_connect_unix_sock(
186 consumer_data
->cmd_unix_sock_path
);
187 if (consumer_data
->cmd_sock
< 0 || consumer_data
->metadata_fd
< 0) {
188 PERROR("consumer connect cmd socket");
189 mark_thread_intialization_as_failed(notifiers
);
193 consumer_data
->metadata_sock
.fd_ptr
= &consumer_data
->metadata_fd
;
195 /* Create metadata socket lock. */
196 consumer_data
->metadata_sock
.lock
= (pthread_mutex_t
*) zmalloc(sizeof(pthread_mutex_t
));
197 if (consumer_data
->metadata_sock
.lock
== NULL
) {
198 PERROR("zmalloc pthread mutex");
199 mark_thread_intialization_as_failed(notifiers
);
202 pthread_mutex_init(consumer_data
->metadata_sock
.lock
, NULL
);
204 DBG("Consumer command socket ready (fd: %d)", consumer_data
->cmd_sock
);
205 DBG("Consumer metadata socket ready (fd: %d)",
206 consumer_data
->metadata_fd
);
209 * Remove the consumerd error sock since we've established a connection.
211 ret
= lttng_poll_del(&events
, consumer_data
->err_sock
);
213 mark_thread_intialization_as_failed(notifiers
);
217 /* Add new accepted error socket. */
218 ret
= lttng_poll_add(&events
, sock
, LPOLLIN
| LPOLLRDHUP
);
220 mark_thread_intialization_as_failed(notifiers
);
224 /* Add metadata socket that is successfully connected. */
225 ret
= lttng_poll_add(&events
, consumer_data
->metadata_fd
,
226 LPOLLIN
| LPOLLRDHUP
);
228 mark_thread_intialization_as_failed(notifiers
);
232 health_code_update();
235 * Transfer the write-end of the channel monitoring pipe to the consumer
236 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
238 cmd_socket_wrapper
= consumer_allocate_socket(&consumer_data
->cmd_sock
);
239 if (!cmd_socket_wrapper
) {
240 mark_thread_intialization_as_failed(notifiers
);
243 cmd_socket_wrapper
->lock
= &consumer_data
->lock
;
245 pthread_mutex_lock(cmd_socket_wrapper
->lock
);
246 ret
= consumer_init(cmd_socket_wrapper
, the_sessiond_uuid
);
248 ERR("Failed to send sessiond uuid to consumer daemon");
249 mark_thread_intialization_as_failed(notifiers
);
250 pthread_mutex_unlock(cmd_socket_wrapper
->lock
);
253 pthread_mutex_unlock(cmd_socket_wrapper
->lock
);
255 ret
= consumer_send_channel_monitor_pipe(cmd_socket_wrapper
,
256 consumer_data
->channel_monitor_pipe
);
258 mark_thread_intialization_as_failed(notifiers
);
262 /* Discard the socket wrapper as it is no longer needed. */
263 consumer_destroy_socket(cmd_socket_wrapper
);
264 cmd_socket_wrapper
= NULL
;
266 /* The thread is completely initialized, signal that it is ready. */
267 mark_thread_as_ready(notifiers
);
269 /* Infinite blocking call, waiting for transmission */
271 health_code_update();
273 /* Exit the thread because the thread quit pipe has been triggered. */
275 /* Not a health error. */
281 ret
= lttng_poll_wait(&events
, -1);
289 for (i
= 0; i
< nb_fd
; i
++) {
290 /* Fetch once the poll data */
291 revents
= LTTNG_POLL_GETEV(&events
, i
);
292 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
294 health_code_update();
297 * Thread quit pipe has been triggered, flag that we should stop
298 * but continue the current loop to handle potential data from
301 if (pollfd
== quit_pipe_read_fd
) {
303 } else if (pollfd
== sock
) {
304 /* Event on the consumerd socket */
305 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)
306 && !(revents
& LPOLLIN
)) {
307 ERR("consumer err socket second poll error");
310 health_code_update();
311 /* Wait for any kconsumerd error */
312 ret
= lttcomm_recv_unix_sock(sock
, &code
,
313 sizeof(enum lttcomm_return_code
));
315 ERR("consumer closed the command socket");
319 ERR("consumer return code : %s",
320 lttcomm_get_readable_code((lttcomm_return_code
) -code
));
323 } else if (pollfd
== consumer_data
->metadata_fd
) {
324 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)
325 && !(revents
& LPOLLIN
)) {
326 ERR("consumer err metadata socket second poll error");
329 /* UST metadata requests */
330 ret
= ust_consumer_metadata_request(
331 &consumer_data
->metadata_sock
);
333 ERR("Handling metadata request");
337 /* No need for an else branch all FDs are tested prior. */
339 health_code_update();
345 * We lock here because we are about to close the sockets and some other
346 * thread might be using them so get exclusive access which will abort all
347 * other consumer command by other threads.
349 pthread_mutex_lock(&consumer_data
->lock
);
351 /* Immediately set the consumerd state to stopped */
352 if (consumer_data
->type
== LTTNG_CONSUMER_KERNEL
) {
353 uatomic_set(&the_kernel_consumerd_state
, CONSUMER_ERROR
);
354 } else if (consumer_data
->type
== LTTNG_CONSUMER64_UST
||
355 consumer_data
->type
== LTTNG_CONSUMER32_UST
) {
356 uatomic_set(&the_ust_consumerd_state
, CONSUMER_ERROR
);
358 /* Code flow error... */
362 if (consumer_data
->err_sock
>= 0) {
363 ret
= close(consumer_data
->err_sock
);
367 consumer_data
->err_sock
= -1;
369 if (consumer_data
->cmd_sock
>= 0) {
370 ret
= close(consumer_data
->cmd_sock
);
374 consumer_data
->cmd_sock
= -1;
376 if (consumer_data
->metadata_sock
.fd_ptr
&&
377 *consumer_data
->metadata_sock
.fd_ptr
>= 0) {
378 ret
= close(*consumer_data
->metadata_sock
.fd_ptr
);
390 unlink(consumer_data
->err_unix_sock_path
);
391 unlink(consumer_data
->cmd_unix_sock_path
);
392 pthread_mutex_unlock(&consumer_data
->lock
);
394 /* Cleanup metadata socket mutex. */
395 if (consumer_data
->metadata_sock
.lock
) {
396 pthread_mutex_destroy(consumer_data
->metadata_sock
.lock
);
397 free(consumer_data
->metadata_sock
.lock
);
399 lttng_poll_clean(&events
);
401 if (cmd_socket_wrapper
) {
402 consumer_destroy_socket(cmd_socket_wrapper
);
407 ERR("Health error occurred in %s", __func__
);
409 health_unregister(the_health_sessiond
);
410 DBG("consumer thread cleanup completed");
412 rcu_thread_offline();
413 rcu_unregister_thread();
418 static bool shutdown_consumer_management_thread(void *data
)
420 struct thread_notifiers
*notifiers
= (thread_notifiers
*) data
;
421 const int write_fd
= lttng_pipe_get_writefd(notifiers
->quit_pipe
);
423 return notify_thread_pipe(write_fd
) == 1;
426 static void cleanup_consumer_management_thread(void *data
)
428 struct thread_notifiers
*notifiers
= (thread_notifiers
*) data
;
430 lttng_pipe_destroy(notifiers
->quit_pipe
);
434 bool launch_consumer_management_thread(struct consumer_data
*consumer_data
)
436 struct lttng_pipe
*quit_pipe
;
437 struct thread_notifiers
*notifiers
= NULL
;
438 struct lttng_thread
*thread
;
440 notifiers
= (thread_notifiers
*) zmalloc(sizeof(*notifiers
));
445 quit_pipe
= lttng_pipe_open(FD_CLOEXEC
);
449 notifiers
->quit_pipe
= quit_pipe
;
450 notifiers
->consumer_data
= consumer_data
;
451 sem_init(¬ifiers
->ready
, 0, 0);
453 thread
= lttng_thread_create("Consumer management",
454 thread_consumer_management
,
455 shutdown_consumer_management_thread
,
456 cleanup_consumer_management_thread
,
461 wait_until_thread_is_ready(notifiers
);
462 lttng_thread_put(thread
);
463 if (notifiers
->initialization_result
) {
468 cleanup_consumer_management_thread(notifiers
);