Fix: rotation state marked as completed before relayd has completed
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.c
1 /*
2 * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <lttng/trigger/trigger.h>
20 #include <common/error.h>
21 #include <common/config/session-config.h>
22 #include <common/defaults.h>
23 #include <common/utils.h>
24 #include <common/futex.h>
25 #include <common/align.h>
26 #include <common/time.h>
27 #include <common/hashtable/utils.h>
28 #include <sys/eventfd.h>
29 #include <sys/stat.h>
30 #include <time.h>
31 #include <signal.h>
32 #include <inttypes.h>
33
34 #include <common/kernel-ctl/kernel-ctl.h>
35 #include <lttng/notification/channel-internal.h>
36 #include <lttng/rotate-internal.h>
37
38 #include "rotation-thread.h"
39 #include "lttng-sessiond.h"
40 #include "health-sessiond.h"
41 #include "rotate.h"
42 #include "cmd.h"
43 #include "session.h"
44 #include "sessiond-timer.h"
45
46 #include <urcu.h>
47 #include <urcu/list.h>
48 #include <urcu/rculfhash.h>
49
50 /*
51 * Store a struct rotation_channel_info for each channel that is currently
52 * being rotated by the consumer.
53 */
54 struct cds_lfht *channel_pending_rotate_ht;
55
56 struct lttng_notification_channel *rotate_notification_channel = NULL;
57
58 struct rotation_thread_state {
59 struct lttng_poll_event events;
60 };
61
62 static
63 void channel_rotation_info_destroy(struct rotation_channel_info *channel_info)
64 {
65 assert(channel_info);
66 free(channel_info);
67 }
68
69 static
70 int match_channel_info(struct cds_lfht_node *node, const void *key)
71 {
72 struct rotation_channel_key *channel_key = (struct rotation_channel_key *) key;
73 struct rotation_channel_info *channel_info;
74
75 channel_info = caa_container_of(node, struct rotation_channel_info,
76 rotate_channels_ht_node);
77
78 return !!((channel_key->key == channel_info->channel_key.key) &&
79 (channel_key->domain == channel_info->channel_key.domain));
80 }
81
82 static
83 struct rotation_channel_info *lookup_channel_pending(uint64_t key,
84 enum lttng_domain_type domain)
85 {
86 struct cds_lfht_iter iter;
87 struct cds_lfht_node *node;
88 struct rotation_channel_info *channel_info = NULL;
89 struct rotation_channel_key channel_key = { .key = key,
90 .domain = domain };
91
92 cds_lfht_lookup(channel_pending_rotate_ht,
93 hash_channel_key(&channel_key),
94 match_channel_info,
95 &channel_key, &iter);
96 node = cds_lfht_iter_get_node(&iter);
97 if (!node) {
98 goto end;
99 }
100
101 channel_info = caa_container_of(node, struct rotation_channel_info,
102 rotate_channels_ht_node);
103 cds_lfht_del(channel_pending_rotate_ht, node);
104 end:
105 return channel_info;
106 }
107
108 /*
109 * Destroy the thread data previously created by the init function.
110 */
111 void rotation_thread_handle_destroy(
112 struct rotation_thread_handle *handle)
113 {
114 int ret;
115
116 if (!handle) {
117 goto end;
118 }
119
120 if (handle->ust32_consumer >= 0) {
121 ret = close(handle->ust32_consumer);
122 if (ret) {
123 PERROR("close 32-bit consumer channel rotation pipe");
124 }
125 }
126 if (handle->ust64_consumer >= 0) {
127 ret = close(handle->ust64_consumer);
128 if (ret) {
129 PERROR("close 64-bit consumer channel rotation pipe");
130 }
131 }
132 if (handle->kernel_consumer >= 0) {
133 ret = close(handle->kernel_consumer);
134 if (ret) {
135 PERROR("close kernel consumer channel rotation pipe");
136 }
137 }
138
139 end:
140 free(handle);
141 }
142
143 struct rotation_thread_handle *rotation_thread_handle_create(
144 struct lttng_pipe *ust32_channel_rotate_pipe,
145 struct lttng_pipe *ust64_channel_rotate_pipe,
146 struct lttng_pipe *kernel_channel_rotate_pipe,
147 int thread_quit_pipe,
148 struct rotation_thread_timer_queue *rotation_timer_queue,
149 struct notification_thread_handle *notification_thread_handle,
150 sem_t *notification_thread_ready)
151 {
152 struct rotation_thread_handle *handle;
153
154 handle = zmalloc(sizeof(*handle));
155 if (!handle) {
156 goto end;
157 }
158
159 if (ust32_channel_rotate_pipe) {
160 handle->ust32_consumer =
161 lttng_pipe_release_readfd(
162 ust32_channel_rotate_pipe);
163 if (handle->ust32_consumer < 0) {
164 goto error;
165 }
166 } else {
167 handle->ust32_consumer = -1;
168 }
169 if (ust64_channel_rotate_pipe) {
170 handle->ust64_consumer =
171 lttng_pipe_release_readfd(
172 ust64_channel_rotate_pipe);
173 if (handle->ust64_consumer < 0) {
174 goto error;
175 }
176 } else {
177 handle->ust64_consumer = -1;
178 }
179 if (kernel_channel_rotate_pipe) {
180 handle->kernel_consumer =
181 lttng_pipe_release_readfd(
182 kernel_channel_rotate_pipe);
183 if (handle->kernel_consumer < 0) {
184 goto error;
185 }
186 } else {
187 handle->kernel_consumer = -1;
188 }
189 handle->thread_quit_pipe = thread_quit_pipe;
190 handle->rotation_timer_queue = rotation_timer_queue;
191 handle->notification_thread_handle = notification_thread_handle;
192 handle->notification_thread_ready = notification_thread_ready;
193
194 end:
195 return handle;
196 error:
197 rotation_thread_handle_destroy(handle);
198 return NULL;
199 }
200
201 static
202 int init_poll_set(struct lttng_poll_event *poll_set,
203 struct rotation_thread_handle *handle)
204 {
205 int ret;
206
207 /*
208 * Create pollset with size 5:
209 * - sessiond quit pipe
210 * - sessiond timer pipe,
211 * - consumerd (32-bit user space) channel rotate pipe,
212 * - consumerd (64-bit user space) channel rotate pipe,
213 * - consumerd (kernel) channel rotate pipe,
214 */
215 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
216 if (ret < 0) {
217 goto end;
218 }
219
220 ret = lttng_poll_add(poll_set, handle->thread_quit_pipe,
221 LPOLLIN | LPOLLERR);
222 if (ret < 0) {
223 ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
224 goto error;
225 }
226 ret = lttng_poll_add(poll_set,
227 lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe),
228 LPOLLIN | LPOLLERR);
229 if (ret < 0) {
230 ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
231 goto error;
232 }
233 ret = lttng_poll_add(poll_set, handle->ust32_consumer,
234 LPOLLIN | LPOLLERR);
235 if (ret < 0) {
236 ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
237 goto error;
238 }
239 ret = lttng_poll_add(poll_set, handle->ust64_consumer,
240 LPOLLIN | LPOLLERR);
241 if (ret < 0) {
242 ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
243 goto error;
244 }
245 if (handle->kernel_consumer >= 0) {
246 ret = lttng_poll_add(poll_set, handle->kernel_consumer,
247 LPOLLIN | LPOLLERR);
248 if (ret < 0) {
249 ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
250 goto error;
251 }
252 }
253
254 end:
255 return ret;
256 error:
257 lttng_poll_clean(poll_set);
258 return ret;
259 }
260
261 static
262 void fini_thread_state(struct rotation_thread_state *state)
263 {
264 lttng_poll_clean(&state->events);
265 cds_lfht_destroy(channel_pending_rotate_ht, NULL);
266 if (rotate_notification_channel) {
267 lttng_notification_channel_destroy(rotate_notification_channel);
268 }
269 }
270
271 static
272 int init_thread_state(struct rotation_thread_handle *handle,
273 struct rotation_thread_state *state)
274 {
275 int ret;
276
277 memset(state, 0, sizeof(*state));
278 lttng_poll_init(&state->events);
279
280 ret = init_poll_set(&state->events, handle);
281 if (ret) {
282 ERR("[rotation-thread] Failed to initialize rotation thread poll set");
283 goto end;
284 }
285
286 channel_pending_rotate_ht = cds_lfht_new(DEFAULT_HT_SIZE,
287 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
288 if (!channel_pending_rotate_ht) {
289 ERR("[rotation-thread] Failed to create channel pending rotation hash table");
290 ret = -1;
291 goto end;
292 }
293
294 /*
295 * We wait until the notification thread is ready to create the
296 * notification channel and add it to the poll_set.
297 */
298 sem_wait(handle->notification_thread_ready);
299 rotate_notification_channel = lttng_notification_channel_create(
300 lttng_session_daemon_notification_endpoint);
301 if (!rotate_notification_channel) {
302 ERR("[rotation-thread] Could not create notification channel");
303 ret = -1;
304 goto end;
305 }
306 ret = lttng_poll_add(&state->events, rotate_notification_channel->socket,
307 LPOLLIN | LPOLLERR);
308 if (ret < 0) {
309 ERR("[rotation-thread] Failed to add notification fd to pollset");
310 goto end;
311 }
312
313 end:
314 return ret;
315 }
316
317 static
318 int handle_channel_rotation_pipe(int fd, uint32_t revents,
319 struct rotation_thread_handle *handle,
320 struct rotation_thread_state *state)
321 {
322 int ret = 0;
323 enum lttng_domain_type domain;
324 struct rotation_channel_info *channel_info;
325 struct ltt_session *session = NULL;
326 uint64_t key;
327
328 if (fd == handle->ust32_consumer ||
329 fd == handle->ust64_consumer) {
330 domain = LTTNG_DOMAIN_UST;
331 } else if (fd == handle->kernel_consumer) {
332 domain = LTTNG_DOMAIN_KERNEL;
333 } else {
334 ERR("[rotation-thread] Unknown channel rotation pipe fd %d",
335 fd);
336 abort();
337 }
338
339 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
340 ret = lttng_poll_del(&state->events, fd);
341 if (ret) {
342 ERR("[rotation-thread] Failed to remove consumer "
343 "rotation pipe from poll set");
344 }
345 goto end;
346 }
347
348 do {
349 ret = read(fd, &key, sizeof(key));
350 } while (ret == -1 && errno == EINTR);
351 if (ret != sizeof(key)) {
352 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
353 fd);
354 ret = -1;
355 goto end;
356 }
357
358 DBG("[rotation-thread] Received notification for chan %" PRIu64
359 ", domain %d", key, domain);
360
361 channel_info = lookup_channel_pending(key, domain);
362 if (!channel_info) {
363 ERR("[rotation-thread] Failed to find channel_info (key = %"
364 PRIu64 ")", key);
365 ret = -1;
366 goto end;
367 }
368 rcu_read_lock();
369 session_lock_list();
370 session = session_find_by_id(channel_info->session_id);
371 if (!session) {
372 /*
373 * The session may have been destroyed before we had a chance to
374 * perform this action, return gracefully.
375 */
376 DBG("[rotation-thread] Session %" PRIu64 " not found",
377 channel_info->session_id);
378 ret = 0;
379 goto end_unlock_session_list;
380 }
381
382 session_lock(session);
383 if (--session->nr_chan_rotate_pending == 0) {
384 time_t now = time(NULL);
385
386 if (now == (time_t) -1) {
387 session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
388 ret = LTTNG_ERR_UNK;
389 goto end_unlock_session;
390 }
391
392 ret = rename_complete_chunk(session, now);
393 if (ret < 0) {
394 ERR("Failed to rename completed rotation chunk");
395 goto end_unlock_session;
396 }
397 session->rotate_pending = false;
398 session->last_chunk_start_ts = session->current_chunk_start_ts;
399 if (session->rotate_pending_relay) {
400 ret = sessiond_timer_rotate_pending_start(
401 session,
402 DEFAULT_ROTATE_PENDING_RELAY_TIMER);
403 if (ret) {
404 ERR("Failed to enable rotate pending timer");
405 ret = -1;
406 goto end_unlock_session;
407 }
408 } else {
409 session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
410 }
411 DBG("Rotation completed for session %s", session->name);
412 }
413
414 ret = 0;
415
416 end_unlock_session:
417 channel_rotation_info_destroy(channel_info);
418 session_unlock(session);
419 end_unlock_session_list:
420 session_unlock_list();
421 rcu_read_unlock();
422 end:
423 return ret;
424 }
425
426 /*
427 * Process the rotate_pending check, called with session lock held.
428 */
429 static
430 int rotate_pending_relay_timer(struct ltt_session *session)
431 {
432 int ret;
433
434 DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
435 session->id);
436 ret = relay_rotate_pending(session, session->rotate_count - 1);
437 if (ret < 0) {
438 ERR("[rotation-thread] Check relay rotate pending");
439 goto end;
440 }
441 if (ret == 0) {
442 DBG("[rotation-thread] Rotation completed on the relay for "
443 "session %" PRIu64, session->id);
444 /*
445 * Now we can clear the pending flag in the session. New
446 * rotations can start now.
447 */
448 session->rotate_pending_relay = false;
449 session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
450 } else if (ret == 1) {
451 DBG("[rotation-thread] Rotation still pending on the relay for "
452 "session %" PRIu64, session->id);
453 ret = sessiond_timer_rotate_pending_start(session,
454 DEFAULT_ROTATE_PENDING_RELAY_TIMER);
455 if (ret) {
456 ERR("Re-enabling rotate pending timer");
457 ret = -1;
458 goto end;
459 }
460 }
461
462 ret = 0;
463
464 end:
465 return ret;
466 }
467
468 /*
469 * Process the rotate_timer, called with session lock held.
470 */
471 static
472 int rotate_timer(struct ltt_session *session)
473 {
474 int ret;
475
476 /*
477 * Complete _at most_ one scheduled rotation on a stopped session.
478 */
479 if (!session->active && session->rotate_timer_enabled &&
480 session->rotated_after_last_stop) {
481 ret = 0;
482 goto end;
483 }
484
485 /* Ignore this timer if a rotation is already in progress. */
486 if (session->rotate_pending || session->rotate_pending_relay) {
487 ret = 0;
488 goto end;
489 }
490
491 DBG("[rotation-thread] Rotate timer on session %s", session->name);
492
493 ret = cmd_rotate_session(session, NULL);
494 if (ret == -LTTNG_ERR_ROTATION_PENDING) {
495 DBG("Scheduled rotation aborted since a rotation is already in progress");
496 ret = 0;
497 goto end;
498 } else if (ret != LTTNG_OK) {
499 ERR("[rotation-thread] Automatic time-triggered rotation failed with error code %i", ret);
500 ret = -1;
501 goto end;
502 }
503
504 ret = 0;
505
506 end:
507 return ret;
508 }
509
510 static
511 int handle_rotate_timer_pipe(uint32_t revents,
512 struct rotation_thread_handle *handle,
513 struct rotation_thread_state *state,
514 struct rotation_thread_timer_queue *queue)
515 {
516 int ret = 0;
517 int fd = lttng_pipe_get_readfd(queue->event_pipe);
518 struct ltt_session *session;
519 char buf[1];
520
521 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
522 ret = lttng_poll_del(&state->events, fd);
523 if (ret) {
524 ERR("[rotation-thread] Failed to remove consumer "
525 "rotate pending pipe from poll set");
526 }
527 goto end;
528 }
529
530 ret = lttng_read(fd, buf, 1);
531 if (ret != 1) {
532 ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
533 ret = -1;
534 goto end;
535 }
536
537 for (;;) {
538 struct sessiond_rotation_timer *timer_data;
539
540 /*
541 * Take the queue lock only to pop elements from the list.
542 */
543 pthread_mutex_lock(&queue->lock);
544 if (cds_list_empty(&queue->list)) {
545 pthread_mutex_unlock(&queue->lock);
546 break;
547 }
548 timer_data = cds_list_first_entry(&queue->list,
549 struct sessiond_rotation_timer, head);
550 cds_list_del(&timer_data->head);
551 pthread_mutex_unlock(&queue->lock);
552
553 /*
554 * session lock to lookup the session ID.
555 */
556 session_lock_list();
557 session = session_find_by_id(timer_data->session_id);
558 if (!session) {
559 DBG("[rotation-thread] Session %" PRIu64 " not found",
560 timer_data->session_id);
561 /*
562 * This is a non-fatal error, and we cannot report it to the
563 * user (timer), so just print the error and continue the
564 * processing.
565 */
566 session_unlock_list();
567 free(timer_data);
568 continue;
569 }
570
571 /*
572 * Take the session lock and release the session_list lock.
573 */
574 session_lock(session);
575 session_unlock_list();
576
577 if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
578 ret = rotate_pending_relay_timer(session);
579 } else if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_TIMER) {
580 ret = rotate_timer(session);
581 } else {
582 ERR("Unknown signal in rotate timer %d", timer_data->signal);
583 ret = -1;
584 }
585 session_unlock(session);
586 free(timer_data);
587 if (ret) {
588 ERR("Error processing timer");
589 goto end;
590 }
591 }
592
593 ret = 0;
594
595 end:
596 return ret;
597 }
598
599 int handle_condition(
600 const struct lttng_condition *condition,
601 const struct lttng_evaluation *evaluation,
602 struct notification_thread_handle *notification_thread_handle)
603 {
604 int ret = 0;
605 const char *condition_session_name = NULL;
606 enum lttng_condition_type condition_type;
607 enum lttng_condition_status condition_status;
608 enum lttng_evaluation_status evaluation_status;
609 uint64_t consumed;
610 struct ltt_session *session;
611
612 condition_type = lttng_condition_get_type(condition);
613
614 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
615 ret = -1;
616 ERR("[rotation-thread] Condition type and session usage type are not the same");
617 goto end;
618 }
619
620 /* Fetch info to test */
621 condition_status = lttng_condition_session_consumed_size_get_session_name(
622 condition, &condition_session_name);
623 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
624 ERR("[rotation-thread] Session name could not be fetched");
625 ret = -1;
626 goto end;
627 }
628 evaluation_status = lttng_evaluation_session_consumed_size_get_consumed_size(evaluation,
629 &consumed);
630 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
631 ERR("[rotation-thread] Failed to get evaluation");
632 ret = -1;
633 goto end;
634 }
635
636 session_lock_list();
637 session = session_find_by_name(condition_session_name);
638 if (!session) {
639 ret = -1;
640 session_unlock_list();
641 ERR("[rotation-thread] Session \"%s\" not found",
642 condition_session_name);
643 goto end;
644 }
645 session_lock(session);
646 session_unlock_list();
647
648 ret = unsubscribe_session_consumed_size_rotation(session,
649 notification_thread_handle);
650 if (ret) {
651 goto end;
652 }
653
654 ret = cmd_rotate_session(session, NULL);
655 if (ret == -LTTNG_ERR_ROTATION_PENDING) {
656 DBG("Rotate already pending, subscribe to the next threshold value");
657 ret = 0;
658 } else if (ret != LTTNG_OK) {
659 ERR("[rotation-thread] Failed to rotate on size notification with error: %s",
660 lttng_strerror(ret));
661 ret = -1;
662 goto end_unlock;
663 }
664 ret = subscribe_session_consumed_size_rotation(session,
665 consumed + session->rotate_size,
666 notification_thread_handle);
667 if (ret) {
668 ERR("[rotation-thread] Failed to subscribe to session consumed size condition");
669 goto end_unlock;
670 }
671 ret = 0;
672
673 end_unlock:
674 session_unlock(session);
675 end:
676 return ret;
677 }
678
679 static
680 int handle_notification_channel(int fd, uint32_t revents,
681 struct rotation_thread_handle *handle,
682 struct rotation_thread_state *state)
683 {
684 int ret;
685 struct lttng_notification *notification;
686 enum lttng_notification_channel_status status;
687 const struct lttng_evaluation *notification_evaluation;
688 const struct lttng_condition *notification_condition;
689
690 /* Receive the next notification. */
691 status = lttng_notification_channel_get_next_notification(
692 rotate_notification_channel,
693 &notification);
694
695 switch (status) {
696 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
697 break;
698 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
699 /* Not an error, we will wait for the next one */
700 ret = 0;
701 goto end;;
702 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
703 ERR("Notification channel was closed");
704 ret = -1;
705 goto end;
706 default:
707 /* Unhandled conditions / errors. */
708 ERR("Unknown notification channel status");
709 ret = -1;
710 goto end;
711 }
712
713 notification_condition = lttng_notification_get_condition(notification);
714 notification_evaluation = lttng_notification_get_evaluation(notification);
715
716 ret = handle_condition(notification_condition, notification_evaluation,
717 handle->notification_thread_handle);
718
719 end:
720 lttng_notification_destroy(notification);
721 if (ret != 0) {
722 goto end;
723 }
724
725
726 return ret;
727 }
728
729 void *thread_rotation(void *data)
730 {
731 int ret;
732 struct rotation_thread_handle *handle = data;
733 struct rotation_thread_state state;
734
735 DBG("[rotation-thread] Started rotation thread");
736
737 if (!handle) {
738 ERR("[rotation-thread] Invalid thread context provided");
739 goto end;
740 }
741
742 rcu_register_thread();
743 rcu_thread_online();
744
745 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
746 health_code_update();
747
748 ret = init_thread_state(handle, &state);
749 if (ret) {
750 goto end;
751 }
752
753 /* Ready to handle client connections. */
754 sessiond_notify_ready();
755
756 while (true) {
757 int fd_count, i;
758
759 health_poll_entry();
760 DBG("[rotation-thread] Entering poll wait");
761 ret = lttng_poll_wait(&state.events, -1);
762 DBG("[rotation-thread] Poll wait returned (%i)", ret);
763 health_poll_exit();
764 if (ret < 0) {
765 /*
766 * Restart interrupted system call.
767 */
768 if (errno == EINTR) {
769 continue;
770 }
771 ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret);
772 goto error;
773 }
774
775 fd_count = ret;
776 for (i = 0; i < fd_count; i++) {
777 int fd = LTTNG_POLL_GETFD(&state.events, i);
778 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
779
780 DBG("[rotation-thread] Handling fd (%i) activity (%u)",
781 fd, revents);
782
783 if (fd == handle->thread_quit_pipe) {
784 DBG("[rotation-thread] Quit pipe activity");
785 goto exit;
786 } else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) {
787 ret = handle_rotate_timer_pipe(revents,
788 handle, &state, handle->rotation_timer_queue);
789 if (ret) {
790 ERR("[rotation-thread] Failed to handle rotation timer pipe event");
791 goto error;
792 }
793 } else if (fd == handle->ust32_consumer ||
794 fd == handle->ust64_consumer ||
795 fd == handle->kernel_consumer) {
796 ret = handle_channel_rotation_pipe(fd,
797 revents, handle, &state);
798 if (ret) {
799 ERR("[rotation-thread] Handle channel rotation pipe");
800 goto error;
801 }
802 } else if (fd == rotate_notification_channel->socket) {
803 ret = handle_notification_channel(fd, revents,
804 handle, &state);
805 if (ret) {
806 ERR("[rotation-thread] Error occured while handling activity on notification channel socket");
807 goto error;
808 }
809 }
810 }
811 }
812 exit:
813 error:
814 DBG("[rotation-thread] Exit");
815 fini_thread_state(&state);
816 health_unregister(health_sessiond);
817 rcu_thread_offline();
818 rcu_unregister_thread();
819 end:
820 return NULL;
821 }
This page took 0.054838 seconds and 4 git commands to generate.