consumerd: send a buffer static sample on flush command
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.cpp
CommitLineData
4ec029ed 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4ec029ed 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
4ec029ed 7 *
4ec029ed
JG
8 */
9
10#include <signal.h>
11
c9e313bc
SM
12#include <common/pipe.hpp>
13#include <common/utils.hpp>
14
15#include "manage-consumer.hpp"
16#include "testpoint.hpp"
17#include "health-sessiond.hpp"
18#include "utils.hpp"
19#include "thread.hpp"
20#include "ust-consumer.hpp"
4ec029ed 21
f1494934 22namespace {
4ec029ed
JG
23struct thread_notifiers {
24 struct lttng_pipe *quit_pipe;
25 struct consumer_data *consumer_data;
52c50f8f 26 sem_t ready;
4ec029ed
JG
27 int initialization_result;
28};
f1494934 29} /* namespace */
4ec029ed
JG
30
31static void mark_thread_as_ready(struct thread_notifiers *notifiers)
32{
33 DBG("Marking consumer management thread as ready");
34 notifiers->initialization_result = 0;
35 sem_post(&notifiers->ready);
36}
37
38static void mark_thread_intialization_as_failed(
39 struct thread_notifiers *notifiers)
40{
52c50f8f 41 ERR("Consumer management thread entering error state");
4ec029ed
JG
42 notifiers->initialization_result = -1;
43 sem_post(&notifiers->ready);
44}
45
46static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
47{
48 DBG("Waiting for consumer management thread to be ready");
49 sem_wait(&notifiers->ready);
50 DBG("Consumer management thread is ready");
51}
52
53/*
54 * This thread manage the consumer error sent back to the session daemon.
55 */
0e0b3d3a 56static void *thread_consumer_management(void *data)
4ec029ed
JG
57{
58 int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
59 uint32_t revents, nb_fd;
60 enum lttcomm_return_code code;
61 struct lttng_poll_event events;
7966af57 62 struct thread_notifiers *notifiers = (thread_notifiers *) data;
4ec029ed
JG
63 struct consumer_data *consumer_data = notifiers->consumer_data;
64 const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
65 struct consumer_socket *cmd_socket_wrapper = NULL;
66
67 DBG("[thread] Manage consumer started");
68
69 rcu_register_thread();
70 rcu_thread_online();
71
412d7227 72 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
4ec029ed
JG
73
74 health_code_update();
75
76 /*
77 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
78 * metadata_sock. Nothing more will be added to this poll set.
79 */
80 ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
81 if (ret < 0) {
82 mark_thread_intialization_as_failed(notifiers);
83 goto error_poll;
84 }
85
86 ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
87 if (ret < 0) {
88 mark_thread_intialization_as_failed(notifiers);
89 goto error;
90 }
91
92 /*
93 * The error socket here is already in a listening state which was done
94 * just before spawning this thread to avoid a race between the consumer
95 * daemon exec trying to connect and the listen() call.
96 */
97 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
98 if (ret < 0) {
99 mark_thread_intialization_as_failed(notifiers);
100 goto error;
101 }
102
103 health_code_update();
104
105 /* Infinite blocking call, waiting for transmission */
106 health_poll_entry();
107
108 if (testpoint(sessiond_thread_manage_consumer)) {
109 mark_thread_intialization_as_failed(notifiers);
110 goto error;
111 }
112
113 ret = lttng_poll_wait(&events, -1);
114 health_poll_exit();
115 if (ret < 0) {
116 mark_thread_intialization_as_failed(notifiers);
117 goto error;
118 }
119
120 nb_fd = ret;
121
122 for (i = 0; i < nb_fd; i++) {
123 /* Fetch once the poll data */
124 revents = LTTNG_POLL_GETEV(&events, i);
125 pollfd = LTTNG_POLL_GETFD(&events, i);
126
127 health_code_update();
128
4ec029ed
JG
129 /* Thread quit pipe has been closed. Killing thread. */
130 if (pollfd == quit_pipe_read_fd) {
131 err = 0;
132 mark_thread_intialization_as_failed(notifiers);
133 goto exit;
134 } else if (pollfd == consumer_data->err_sock) {
135 /* Event on the registration socket */
136 if (revents & LPOLLIN) {
137 continue;
138 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
139 ERR("consumer err socket poll error");
140 mark_thread_intialization_as_failed(notifiers);
141 goto error;
142 } else {
143 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
144 mark_thread_intialization_as_failed(notifiers);
145 goto error;
146 }
147 }
148 }
149
150 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
151 if (sock < 0) {
152 mark_thread_intialization_as_failed(notifiers);
153 goto error;
154 }
155
156 /*
157 * Set the CLOEXEC flag. Return code is useless because either way, the
158 * show must go on.
159 */
160 (void) utils_set_fd_cloexec(sock);
161
162 health_code_update();
163
164 DBG2("Receiving code from consumer err_sock");
165
166 /* Getting status code from kconsumerd */
167 ret = lttcomm_recv_unix_sock(sock, &code,
168 sizeof(enum lttcomm_return_code));
169 if (ret <= 0) {
170 mark_thread_intialization_as_failed(notifiers);
171 goto error;
172 }
173
174 health_code_update();
175 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
176 ERR("consumer error when waiting for SOCK_READY : %s",
7966af57 177 lttcomm_get_readable_code((lttcomm_return_code) -code));
4ec029ed
JG
178 mark_thread_intialization_as_failed(notifiers);
179 goto error;
180 }
181
182 /* Connect both command and metadata sockets. */
183 consumer_data->cmd_sock =
184 lttcomm_connect_unix_sock(
185 consumer_data->cmd_unix_sock_path);
186 consumer_data->metadata_fd =
187 lttcomm_connect_unix_sock(
188 consumer_data->cmd_unix_sock_path);
189 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
190 PERROR("consumer connect cmd socket");
191 mark_thread_intialization_as_failed(notifiers);
192 goto error;
193 }
194
195 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
196
197 /* Create metadata socket lock. */
64803277 198 consumer_data->metadata_sock.lock = zmalloc<pthread_mutex_t>();
4ec029ed
JG
199 if (consumer_data->metadata_sock.lock == NULL) {
200 PERROR("zmalloc pthread mutex");
201 mark_thread_intialization_as_failed(notifiers);
202 goto error;
203 }
204 pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
205
206 DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
207 DBG("Consumer metadata socket ready (fd: %d)",
208 consumer_data->metadata_fd);
209
210 /*
211 * Remove the consumerd error sock since we've established a connection.
212 */
213 ret = lttng_poll_del(&events, consumer_data->err_sock);
214 if (ret < 0) {
215 mark_thread_intialization_as_failed(notifiers);
216 goto error;
217 }
218
219 /* Add new accepted error socket. */
220 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
221 if (ret < 0) {
222 mark_thread_intialization_as_failed(notifiers);
223 goto error;
224 }
225
226 /* Add metadata socket that is successfully connected. */
227 ret = lttng_poll_add(&events, consumer_data->metadata_fd,
228 LPOLLIN | LPOLLRDHUP);
229 if (ret < 0) {
230 mark_thread_intialization_as_failed(notifiers);
231 goto error;
232 }
233
234 health_code_update();
235
236 /*
09ede842
JG
237 * Transfer the write-end of the channel monitoring pipe to the consumer
238 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
4ec029ed
JG
239 */
240 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
241 if (!cmd_socket_wrapper) {
242 mark_thread_intialization_as_failed(notifiers);
243 goto error;
244 }
245 cmd_socket_wrapper->lock = &consumer_data->lock;
246
09ede842 247 pthread_mutex_lock(cmd_socket_wrapper->lock);
412d7227 248 ret = consumer_init(cmd_socket_wrapper, the_sessiond_uuid);
09ede842
JG
249 if (ret) {
250 ERR("Failed to send sessiond uuid to consumer daemon");
251 mark_thread_intialization_as_failed(notifiers);
252 pthread_mutex_unlock(cmd_socket_wrapper->lock);
253 goto error;
254 }
255 pthread_mutex_unlock(cmd_socket_wrapper->lock);
256
4ec029ed
JG
257 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
258 consumer_data->channel_monitor_pipe);
259 if (ret) {
260 mark_thread_intialization_as_failed(notifiers);
261 goto error;
262 }
263
264 /* Discard the socket wrapper as it is no longer needed. */
265 consumer_destroy_socket(cmd_socket_wrapper);
266 cmd_socket_wrapper = NULL;
267
268 /* The thread is completely initialized, signal that it is ready. */
269 mark_thread_as_ready(notifiers);
270
271 /* Infinite blocking call, waiting for transmission */
272 while (1) {
273 health_code_update();
274
275 /* Exit the thread because the thread quit pipe has been triggered. */
276 if (should_quit) {
277 /* Not a health error. */
278 err = 0;
279 goto exit;
280 }
281
282 health_poll_entry();
283 ret = lttng_poll_wait(&events, -1);
284 health_poll_exit();
285 if (ret < 0) {
286 goto error;
287 }
288
289 nb_fd = ret;
290
291 for (i = 0; i < nb_fd; i++) {
292 /* Fetch once the poll data */
293 revents = LTTNG_POLL_GETEV(&events, i);
294 pollfd = LTTNG_POLL_GETFD(&events, i);
295
296 health_code_update();
297
4ec029ed
JG
298 /*
299 * Thread quit pipe has been triggered, flag that we should stop
300 * but continue the current loop to handle potential data from
301 * consumer.
302 */
303 if (pollfd == quit_pipe_read_fd) {
304 should_quit = 1;
305 } else if (pollfd == sock) {
306 /* Event on the consumerd socket */
307 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
308 && !(revents & LPOLLIN)) {
309 ERR("consumer err socket second poll error");
310 goto error;
311 }
312 health_code_update();
313 /* Wait for any kconsumerd error */
314 ret = lttcomm_recv_unix_sock(sock, &code,
315 sizeof(enum lttcomm_return_code));
316 if (ret <= 0) {
317 ERR("consumer closed the command socket");
318 goto error;
319 }
320
321 ERR("consumer return code : %s",
7966af57 322 lttcomm_get_readable_code((lttcomm_return_code) -code));
4ec029ed
JG
323
324 goto exit;
325 } else if (pollfd == consumer_data->metadata_fd) {
326 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
327 && !(revents & LPOLLIN)) {
328 ERR("consumer err metadata socket second poll error");
329 goto error;
330 }
331 /* UST metadata requests */
332 ret = ust_consumer_metadata_request(
333 &consumer_data->metadata_sock);
334 if (ret < 0) {
335 ERR("Handling metadata request");
336 goto error;
337 }
338 }
339 /* No need for an else branch all FDs are tested prior. */
340 }
341 health_code_update();
342 }
343
344exit:
345error:
346 /*
347 * We lock here because we are about to close the sockets and some other
348 * thread might be using them so get exclusive access which will abort all
349 * other consumer command by other threads.
350 */
351 pthread_mutex_lock(&consumer_data->lock);
352
353 /* Immediately set the consumerd state to stopped */
354 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
412d7227 355 uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR);
4ec029ed
JG
356 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
357 consumer_data->type == LTTNG_CONSUMER32_UST) {
412d7227 358 uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR);
4ec029ed
JG
359 } else {
360 /* Code flow error... */
a0377dfe 361 abort();
4ec029ed
JG
362 }
363
364 if (consumer_data->err_sock >= 0) {
365 ret = close(consumer_data->err_sock);
366 if (ret) {
367 PERROR("close");
368 }
369 consumer_data->err_sock = -1;
370 }
371 if (consumer_data->cmd_sock >= 0) {
372 ret = close(consumer_data->cmd_sock);
373 if (ret) {
374 PERROR("close");
375 }
376 consumer_data->cmd_sock = -1;
377 }
378 if (consumer_data->metadata_sock.fd_ptr &&
379 *consumer_data->metadata_sock.fd_ptr >= 0) {
380 ret = close(*consumer_data->metadata_sock.fd_ptr);
381 if (ret) {
382 PERROR("close");
383 }
384 }
385 if (sock >= 0) {
386 ret = close(sock);
387 if (ret) {
388 PERROR("close");
389 }
390 }
391
392 unlink(consumer_data->err_unix_sock_path);
393 unlink(consumer_data->cmd_unix_sock_path);
394 pthread_mutex_unlock(&consumer_data->lock);
395
396 /* Cleanup metadata socket mutex. */
397 if (consumer_data->metadata_sock.lock) {
398 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
399 free(consumer_data->metadata_sock.lock);
400 }
401 lttng_poll_clean(&events);
402
403 if (cmd_socket_wrapper) {
404 consumer_destroy_socket(cmd_socket_wrapper);
405 }
406error_poll:
407 if (err) {
408 health_error();
409 ERR("Health error occurred in %s", __func__);
410 }
412d7227 411 health_unregister(the_health_sessiond);
4ec029ed
JG
412 DBG("consumer thread cleanup completed");
413
414 rcu_thread_offline();
415 rcu_unregister_thread();
416
417 return NULL;
418}
419
420static bool shutdown_consumer_management_thread(void *data)
421{
7966af57 422 struct thread_notifiers *notifiers = (thread_notifiers *) data;
4ec029ed
JG
423 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
424
425 return notify_thread_pipe(write_fd) == 1;
426}
427
428static void cleanup_consumer_management_thread(void *data)
429{
7966af57 430 struct thread_notifiers *notifiers = (thread_notifiers *) data;
4ec029ed
JG
431
432 lttng_pipe_destroy(notifiers->quit_pipe);
433 free(notifiers);
434}
435
436bool launch_consumer_management_thread(struct consumer_data *consumer_data)
437{
438 struct lttng_pipe *quit_pipe;
439 struct thread_notifiers *notifiers = NULL;
440 struct lttng_thread *thread;
441
64803277 442 notifiers = zmalloc<thread_notifiers>();
4ec029ed 443 if (!notifiers) {
21fa020e
JG
444 goto error_alloc;
445 }
446
447 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
448 if (!quit_pipe) {
4ec029ed
JG
449 goto error;
450 }
451 notifiers->quit_pipe = quit_pipe;
452 notifiers->consumer_data = consumer_data;
453 sem_init(&notifiers->ready, 0, 0);
454
455 thread = lttng_thread_create("Consumer management",
456 thread_consumer_management,
457 shutdown_consumer_management_thread,
458 cleanup_consumer_management_thread,
459 notifiers);
460 if (!thread) {
461 goto error;
462 }
463 wait_until_thread_is_ready(notifiers);
464 lttng_thread_put(thread);
465 if (notifiers->initialization_result) {
bd3739b0 466 return false;
4ec029ed
JG
467 }
468 return true;
469error:
470 cleanup_consumer_management_thread(notifiers);
21fa020e 471error_alloc:
4ec029ed
JG
472 return false;
473}
This page took 0.065608 seconds and 4 git commands to generate.