sessiond: clean-up: typo in ust-app.c comment
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.c
1 /*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <signal.h>
21
22 #include <common/pipe.h>
23 #include <common/utils.h>
24
25 #include "manage-consumer.h"
26 #include "testpoint.h"
27 #include "health-sessiond.h"
28 #include "utils.h"
29 #include "thread.h"
30 #include "ust-consumer.h"
31
32 struct thread_notifiers {
33 struct lttng_pipe *quit_pipe;
34 struct consumer_data *consumer_data;
35 sem_t ready;
36 int initialization_result;
37 };
38
39 static void mark_thread_as_ready(struct thread_notifiers *notifiers)
40 {
41 DBG("Marking consumer management thread as ready");
42 notifiers->initialization_result = 0;
43 sem_post(&notifiers->ready);
44 }
45
46 static void mark_thread_intialization_as_failed(
47 struct thread_notifiers *notifiers)
48 {
49 ERR("Consumer management thread entering error state");
50 notifiers->initialization_result = -1;
51 sem_post(&notifiers->ready);
52 }
53
54 static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
55 {
56 DBG("Waiting for consumer management thread to be ready");
57 sem_wait(&notifiers->ready);
58 DBG("Consumer management thread is ready");
59 }
60
61 /*
62 * This thread manage the consumer error sent back to the session daemon.
63 */
64 void *thread_consumer_management(void *data)
65 {
66 int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
67 uint32_t revents, nb_fd;
68 enum lttcomm_return_code code;
69 struct lttng_poll_event events;
70 struct thread_notifiers *notifiers = data;
71 struct consumer_data *consumer_data = notifiers->consumer_data;
72 const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
73 struct consumer_socket *cmd_socket_wrapper = NULL;
74
75 DBG("[thread] Manage consumer started");
76
77 rcu_register_thread();
78 rcu_thread_online();
79
80 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
81
82 health_code_update();
83
84 /*
85 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
86 * metadata_sock. Nothing more will be added to this poll set.
87 */
88 ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
89 if (ret < 0) {
90 mark_thread_intialization_as_failed(notifiers);
91 goto error_poll;
92 }
93
94 ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
95 if (ret < 0) {
96 mark_thread_intialization_as_failed(notifiers);
97 goto error;
98 }
99
100 /*
101 * The error socket here is already in a listening state which was done
102 * just before spawning this thread to avoid a race between the consumer
103 * daemon exec trying to connect and the listen() call.
104 */
105 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
106 if (ret < 0) {
107 mark_thread_intialization_as_failed(notifiers);
108 goto error;
109 }
110
111 health_code_update();
112
113 /* Infinite blocking call, waiting for transmission */
114 health_poll_entry();
115
116 if (testpoint(sessiond_thread_manage_consumer)) {
117 mark_thread_intialization_as_failed(notifiers);
118 goto error;
119 }
120
121 ret = lttng_poll_wait(&events, -1);
122 health_poll_exit();
123 if (ret < 0) {
124 mark_thread_intialization_as_failed(notifiers);
125 goto error;
126 }
127
128 nb_fd = ret;
129
130 for (i = 0; i < nb_fd; i++) {
131 /* Fetch once the poll data */
132 revents = LTTNG_POLL_GETEV(&events, i);
133 pollfd = LTTNG_POLL_GETFD(&events, i);
134
135 health_code_update();
136
137 /* Thread quit pipe has been closed. Killing thread. */
138 if (pollfd == quit_pipe_read_fd) {
139 err = 0;
140 mark_thread_intialization_as_failed(notifiers);
141 goto exit;
142 } else if (pollfd == consumer_data->err_sock) {
143 /* Event on the registration socket */
144 if (revents & LPOLLIN) {
145 continue;
146 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
147 ERR("consumer err socket poll error");
148 mark_thread_intialization_as_failed(notifiers);
149 goto error;
150 } else {
151 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
152 mark_thread_intialization_as_failed(notifiers);
153 goto error;
154 }
155 }
156 }
157
158 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
159 if (sock < 0) {
160 mark_thread_intialization_as_failed(notifiers);
161 goto error;
162 }
163
164 /*
165 * Set the CLOEXEC flag. Return code is useless because either way, the
166 * show must go on.
167 */
168 (void) utils_set_fd_cloexec(sock);
169
170 health_code_update();
171
172 DBG2("Receiving code from consumer err_sock");
173
174 /* Getting status code from kconsumerd */
175 ret = lttcomm_recv_unix_sock(sock, &code,
176 sizeof(enum lttcomm_return_code));
177 if (ret <= 0) {
178 mark_thread_intialization_as_failed(notifiers);
179 goto error;
180 }
181
182 health_code_update();
183 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
184 ERR("consumer error when waiting for SOCK_READY : %s",
185 lttcomm_get_readable_code(-code));
186 mark_thread_intialization_as_failed(notifiers);
187 goto error;
188 }
189
190 /* Connect both command and metadata sockets. */
191 consumer_data->cmd_sock =
192 lttcomm_connect_unix_sock(
193 consumer_data->cmd_unix_sock_path);
194 consumer_data->metadata_fd =
195 lttcomm_connect_unix_sock(
196 consumer_data->cmd_unix_sock_path);
197 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
198 PERROR("consumer connect cmd socket");
199 mark_thread_intialization_as_failed(notifiers);
200 goto error;
201 }
202
203 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
204
205 /* Create metadata socket lock. */
206 consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
207 if (consumer_data->metadata_sock.lock == NULL) {
208 PERROR("zmalloc pthread mutex");
209 mark_thread_intialization_as_failed(notifiers);
210 goto error;
211 }
212 pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
213
214 DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
215 DBG("Consumer metadata socket ready (fd: %d)",
216 consumer_data->metadata_fd);
217
218 /*
219 * Remove the consumerd error sock since we've established a connection.
220 */
221 ret = lttng_poll_del(&events, consumer_data->err_sock);
222 if (ret < 0) {
223 mark_thread_intialization_as_failed(notifiers);
224 goto error;
225 }
226
227 /* Add new accepted error socket. */
228 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
229 if (ret < 0) {
230 mark_thread_intialization_as_failed(notifiers);
231 goto error;
232 }
233
234 /* Add metadata socket that is successfully connected. */
235 ret = lttng_poll_add(&events, consumer_data->metadata_fd,
236 LPOLLIN | LPOLLRDHUP);
237 if (ret < 0) {
238 mark_thread_intialization_as_failed(notifiers);
239 goto error;
240 }
241
242 health_code_update();
243
244 /*
245 * Transfer the write-end of the channel monitoring pipe to the consumer
246 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
247 */
248 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
249 if (!cmd_socket_wrapper) {
250 mark_thread_intialization_as_failed(notifiers);
251 goto error;
252 }
253 cmd_socket_wrapper->lock = &consumer_data->lock;
254
255 pthread_mutex_lock(cmd_socket_wrapper->lock);
256 ret = consumer_init(cmd_socket_wrapper, sessiond_uuid);
257 if (ret) {
258 ERR("Failed to send sessiond uuid to consumer daemon");
259 mark_thread_intialization_as_failed(notifiers);
260 pthread_mutex_unlock(cmd_socket_wrapper->lock);
261 goto error;
262 }
263 pthread_mutex_unlock(cmd_socket_wrapper->lock);
264
265 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
266 consumer_data->channel_monitor_pipe);
267 if (ret) {
268 mark_thread_intialization_as_failed(notifiers);
269 goto error;
270 }
271
272 /* Discard the socket wrapper as it is no longer needed. */
273 consumer_destroy_socket(cmd_socket_wrapper);
274 cmd_socket_wrapper = NULL;
275
276 /* The thread is completely initialized, signal that it is ready. */
277 mark_thread_as_ready(notifiers);
278
279 /* Infinite blocking call, waiting for transmission */
280 while (1) {
281 health_code_update();
282
283 /* Exit the thread because the thread quit pipe has been triggered. */
284 if (should_quit) {
285 /* Not a health error. */
286 err = 0;
287 goto exit;
288 }
289
290 health_poll_entry();
291 ret = lttng_poll_wait(&events, -1);
292 health_poll_exit();
293 if (ret < 0) {
294 goto error;
295 }
296
297 nb_fd = ret;
298
299 for (i = 0; i < nb_fd; i++) {
300 /* Fetch once the poll data */
301 revents = LTTNG_POLL_GETEV(&events, i);
302 pollfd = LTTNG_POLL_GETFD(&events, i);
303
304 health_code_update();
305
306 /*
307 * Thread quit pipe has been triggered, flag that we should stop
308 * but continue the current loop to handle potential data from
309 * consumer.
310 */
311 if (pollfd == quit_pipe_read_fd) {
312 should_quit = 1;
313 } else if (pollfd == sock) {
314 /* Event on the consumerd socket */
315 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
316 && !(revents & LPOLLIN)) {
317 ERR("consumer err socket second poll error");
318 goto error;
319 }
320 health_code_update();
321 /* Wait for any kconsumerd error */
322 ret = lttcomm_recv_unix_sock(sock, &code,
323 sizeof(enum lttcomm_return_code));
324 if (ret <= 0) {
325 ERR("consumer closed the command socket");
326 goto error;
327 }
328
329 ERR("consumer return code : %s",
330 lttcomm_get_readable_code(-code));
331
332 goto exit;
333 } else if (pollfd == consumer_data->metadata_fd) {
334 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
335 && !(revents & LPOLLIN)) {
336 ERR("consumer err metadata socket second poll error");
337 goto error;
338 }
339 /* UST metadata requests */
340 ret = ust_consumer_metadata_request(
341 &consumer_data->metadata_sock);
342 if (ret < 0) {
343 ERR("Handling metadata request");
344 goto error;
345 }
346 }
347 /* No need for an else branch all FDs are tested prior. */
348 }
349 health_code_update();
350 }
351
352 exit:
353 error:
354 /*
355 * We lock here because we are about to close the sockets and some other
356 * thread might be using them so get exclusive access which will abort all
357 * other consumer command by other threads.
358 */
359 pthread_mutex_lock(&consumer_data->lock);
360
361 /* Immediately set the consumerd state to stopped */
362 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
363 uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
364 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
365 consumer_data->type == LTTNG_CONSUMER32_UST) {
366 uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
367 } else {
368 /* Code flow error... */
369 assert(0);
370 }
371
372 if (consumer_data->err_sock >= 0) {
373 ret = close(consumer_data->err_sock);
374 if (ret) {
375 PERROR("close");
376 }
377 consumer_data->err_sock = -1;
378 }
379 if (consumer_data->cmd_sock >= 0) {
380 ret = close(consumer_data->cmd_sock);
381 if (ret) {
382 PERROR("close");
383 }
384 consumer_data->cmd_sock = -1;
385 }
386 if (consumer_data->metadata_sock.fd_ptr &&
387 *consumer_data->metadata_sock.fd_ptr >= 0) {
388 ret = close(*consumer_data->metadata_sock.fd_ptr);
389 if (ret) {
390 PERROR("close");
391 }
392 }
393 if (sock >= 0) {
394 ret = close(sock);
395 if (ret) {
396 PERROR("close");
397 }
398 }
399
400 unlink(consumer_data->err_unix_sock_path);
401 unlink(consumer_data->cmd_unix_sock_path);
402 pthread_mutex_unlock(&consumer_data->lock);
403
404 /* Cleanup metadata socket mutex. */
405 if (consumer_data->metadata_sock.lock) {
406 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
407 free(consumer_data->metadata_sock.lock);
408 }
409 lttng_poll_clean(&events);
410
411 if (cmd_socket_wrapper) {
412 consumer_destroy_socket(cmd_socket_wrapper);
413 }
414 error_poll:
415 if (err) {
416 health_error();
417 ERR("Health error occurred in %s", __func__);
418 }
419 health_unregister(health_sessiond);
420 DBG("consumer thread cleanup completed");
421
422 rcu_thread_offline();
423 rcu_unregister_thread();
424
425 return NULL;
426 }
427
428 static bool shutdown_consumer_management_thread(void *data)
429 {
430 struct thread_notifiers *notifiers = data;
431 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
432
433 return notify_thread_pipe(write_fd) == 1;
434 }
435
436 static void cleanup_consumer_management_thread(void *data)
437 {
438 struct thread_notifiers *notifiers = data;
439
440 lttng_pipe_destroy(notifiers->quit_pipe);
441 free(notifiers);
442 }
443
444 bool launch_consumer_management_thread(struct consumer_data *consumer_data)
445 {
446 struct lttng_pipe *quit_pipe;
447 struct thread_notifiers *notifiers = NULL;
448 struct lttng_thread *thread;
449
450 notifiers = zmalloc(sizeof(*notifiers));
451 if (!notifiers) {
452 goto error_alloc;
453 }
454
455 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
456 if (!quit_pipe) {
457 goto error;
458 }
459 notifiers->quit_pipe = quit_pipe;
460 notifiers->consumer_data = consumer_data;
461 sem_init(&notifiers->ready, 0, 0);
462
463 thread = lttng_thread_create("Consumer management",
464 thread_consumer_management,
465 shutdown_consumer_management_thread,
466 cleanup_consumer_management_thread,
467 notifiers);
468 if (!thread) {
469 goto error;
470 }
471 wait_until_thread_is_ready(notifiers);
472 lttng_thread_put(thread);
473 if (notifiers->initialization_result) {
474 return false;
475 }
476 return true;
477 error:
478 cleanup_consumer_management_thread(notifiers);
479 error_alloc:
480 return false;
481 }
This page took 0.04049 seconds and 4 git commands to generate.