01a963c25d45c026119a2300d78780ac82772736
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.c
1 /*
2 * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <lttng/trigger/trigger.h>
20 #include <common/error.h>
21 #include <common/config/session-config.h>
22 #include <common/defaults.h>
23 #include <common/utils.h>
24 #include <common/futex.h>
25 #include <common/align.h>
26 #include <common/time.h>
27 #include <common/hashtable/utils.h>
28 #include <sys/eventfd.h>
29 #include <sys/stat.h>
30 #include <time.h>
31 #include <signal.h>
32 #include <inttypes.h>
33
34 #include <common/kernel-ctl/kernel-ctl.h>
35 #include <lttng/notification/channel-internal.h>
36 #include <lttng/rotate-internal.h>
37
38 #include "rotation-thread.h"
39 #include "lttng-sessiond.h"
40 #include "health-sessiond.h"
41 #include "rotate.h"
42 #include "cmd.h"
43 #include "session.h"
44 #include "sessiond-timer.h"
45 #include "notification-thread-commands.h"
46
47 #include <urcu.h>
48 #include <urcu/list.h>
49 #include <urcu/rculfhash.h>
50
51 /*
52 * Store a struct rotation_channel_info for each channel that is currently
53 * being rotated by the consumer.
54 */
55 struct cds_lfht *channel_pending_rotate_ht;
56
57 struct lttng_notification_channel *rotate_notification_channel = NULL;
58
59 struct rotation_thread_state {
60 struct lttng_poll_event events;
61 };
62
63 static
64 void channel_rotation_info_destroy(struct rotation_channel_info *channel_info)
65 {
66 assert(channel_info);
67 free(channel_info);
68 }
69
70 static
71 int match_channel_info(struct cds_lfht_node *node, const void *key)
72 {
73 struct rotation_channel_key *channel_key = (struct rotation_channel_key *) key;
74 struct rotation_channel_info *channel_info;
75
76 channel_info = caa_container_of(node, struct rotation_channel_info,
77 rotate_channels_ht_node);
78
79 return !!((channel_key->key == channel_info->channel_key.key) &&
80 (channel_key->domain == channel_info->channel_key.domain));
81 }
82
83 static
84 struct rotation_channel_info *lookup_channel_pending(uint64_t key,
85 enum lttng_domain_type domain)
86 {
87 struct cds_lfht_iter iter;
88 struct cds_lfht_node *node;
89 struct rotation_channel_info *channel_info = NULL;
90 struct rotation_channel_key channel_key = { .key = key,
91 .domain = domain };
92
93 cds_lfht_lookup(channel_pending_rotate_ht,
94 hash_channel_key(&channel_key),
95 match_channel_info,
96 &channel_key, &iter);
97 node = cds_lfht_iter_get_node(&iter);
98 if (!node) {
99 goto end;
100 }
101
102 channel_info = caa_container_of(node, struct rotation_channel_info,
103 rotate_channels_ht_node);
104 cds_lfht_del(channel_pending_rotate_ht, node);
105 end:
106 return channel_info;
107 }
108
109 /*
110 * Destroy the thread data previously created by the init function.
111 */
112 void rotation_thread_handle_destroy(
113 struct rotation_thread_handle *handle)
114 {
115 int ret;
116
117 if (!handle) {
118 goto end;
119 }
120
121 if (handle->ust32_consumer >= 0) {
122 ret = close(handle->ust32_consumer);
123 if (ret) {
124 PERROR("close 32-bit consumer channel rotation pipe");
125 }
126 }
127 if (handle->ust64_consumer >= 0) {
128 ret = close(handle->ust64_consumer);
129 if (ret) {
130 PERROR("close 64-bit consumer channel rotation pipe");
131 }
132 }
133 if (handle->kernel_consumer >= 0) {
134 ret = close(handle->kernel_consumer);
135 if (ret) {
136 PERROR("close kernel consumer channel rotation pipe");
137 }
138 }
139
140 end:
141 free(handle);
142 }
143
144 struct rotation_thread_handle *rotation_thread_handle_create(
145 struct lttng_pipe *ust32_channel_rotate_pipe,
146 struct lttng_pipe *ust64_channel_rotate_pipe,
147 struct lttng_pipe *kernel_channel_rotate_pipe,
148 int thread_quit_pipe,
149 struct rotation_thread_timer_queue *rotation_timer_queue,
150 struct notification_thread_handle *notification_thread_handle,
151 sem_t *notification_thread_ready)
152 {
153 struct rotation_thread_handle *handle;
154
155 handle = zmalloc(sizeof(*handle));
156 if (!handle) {
157 goto end;
158 }
159
160 if (ust32_channel_rotate_pipe) {
161 handle->ust32_consumer =
162 lttng_pipe_release_readfd(
163 ust32_channel_rotate_pipe);
164 if (handle->ust32_consumer < 0) {
165 goto error;
166 }
167 } else {
168 handle->ust32_consumer = -1;
169 }
170 if (ust64_channel_rotate_pipe) {
171 handle->ust64_consumer =
172 lttng_pipe_release_readfd(
173 ust64_channel_rotate_pipe);
174 if (handle->ust64_consumer < 0) {
175 goto error;
176 }
177 } else {
178 handle->ust64_consumer = -1;
179 }
180 if (kernel_channel_rotate_pipe) {
181 handle->kernel_consumer =
182 lttng_pipe_release_readfd(
183 kernel_channel_rotate_pipe);
184 if (handle->kernel_consumer < 0) {
185 goto error;
186 }
187 } else {
188 handle->kernel_consumer = -1;
189 }
190 handle->thread_quit_pipe = thread_quit_pipe;
191 handle->rotation_timer_queue = rotation_timer_queue;
192 handle->notification_thread_handle = notification_thread_handle;
193 handle->notification_thread_ready = notification_thread_ready;
194
195 end:
196 return handle;
197 error:
198 rotation_thread_handle_destroy(handle);
199 return NULL;
200 }
201
202 static
203 int init_poll_set(struct lttng_poll_event *poll_set,
204 struct rotation_thread_handle *handle)
205 {
206 int ret;
207
208 /*
209 * Create pollset with size 5:
210 * - sessiond quit pipe
211 * - sessiond timer pipe,
212 * - consumerd (32-bit user space) channel rotate pipe,
213 * - consumerd (64-bit user space) channel rotate pipe,
214 * - consumerd (kernel) channel rotate pipe,
215 */
216 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
217 if (ret < 0) {
218 goto end;
219 }
220
221 ret = lttng_poll_add(poll_set, handle->thread_quit_pipe,
222 LPOLLIN | LPOLLERR);
223 if (ret < 0) {
224 ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
225 goto error;
226 }
227 ret = lttng_poll_add(poll_set,
228 lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe),
229 LPOLLIN | LPOLLERR);
230 if (ret < 0) {
231 ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
232 goto error;
233 }
234 ret = lttng_poll_add(poll_set, handle->ust32_consumer,
235 LPOLLIN | LPOLLERR);
236 if (ret < 0) {
237 ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
238 goto error;
239 }
240 ret = lttng_poll_add(poll_set, handle->ust64_consumer,
241 LPOLLIN | LPOLLERR);
242 if (ret < 0) {
243 ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
244 goto error;
245 }
246 if (handle->kernel_consumer >= 0) {
247 ret = lttng_poll_add(poll_set, handle->kernel_consumer,
248 LPOLLIN | LPOLLERR);
249 if (ret < 0) {
250 ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
251 goto error;
252 }
253 }
254
255 end:
256 return ret;
257 error:
258 lttng_poll_clean(poll_set);
259 return ret;
260 }
261
262 static
263 void fini_thread_state(struct rotation_thread_state *state)
264 {
265 int ret;
266
267 lttng_poll_clean(&state->events);
268 ret = cds_lfht_destroy(channel_pending_rotate_ht, NULL);
269 assert(!ret);
270 if (rotate_notification_channel) {
271 lttng_notification_channel_destroy(rotate_notification_channel);
272 }
273 }
274
275 static
276 int init_thread_state(struct rotation_thread_handle *handle,
277 struct rotation_thread_state *state)
278 {
279 int ret;
280
281 memset(state, 0, sizeof(*state));
282 lttng_poll_init(&state->events);
283
284 ret = init_poll_set(&state->events, handle);
285 if (ret) {
286 ERR("[rotation-thread] Failed to initialize rotation thread poll set");
287 goto end;
288 }
289
290 channel_pending_rotate_ht = cds_lfht_new(DEFAULT_HT_SIZE,
291 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
292 if (!channel_pending_rotate_ht) {
293 ERR("[rotation-thread] Failed to create channel pending rotation hash table");
294 ret = -1;
295 goto end;
296 }
297
298 /*
299 * We wait until the notification thread is ready to create the
300 * notification channel and add it to the poll_set.
301 */
302 sem_wait(handle->notification_thread_ready);
303 rotate_notification_channel = lttng_notification_channel_create(
304 lttng_session_daemon_notification_endpoint);
305 if (!rotate_notification_channel) {
306 ERR("[rotation-thread] Could not create notification channel");
307 ret = -1;
308 goto end;
309 }
310 ret = lttng_poll_add(&state->events, rotate_notification_channel->socket,
311 LPOLLIN | LPOLLERR);
312 if (ret < 0) {
313 ERR("[rotation-thread] Failed to add notification fd to pollset");
314 goto end;
315 }
316
317 end:
318 return ret;
319 }
320
321 static
322 int handle_channel_rotation_pipe(int fd, uint32_t revents,
323 struct rotation_thread_handle *handle,
324 struct rotation_thread_state *state)
325 {
326 int ret = 0;
327 enum lttng_domain_type domain;
328 struct rotation_channel_info *channel_info;
329 struct ltt_session *session = NULL;
330 uint64_t key;
331
332 if (fd == handle->ust32_consumer ||
333 fd == handle->ust64_consumer) {
334 domain = LTTNG_DOMAIN_UST;
335 } else if (fd == handle->kernel_consumer) {
336 domain = LTTNG_DOMAIN_KERNEL;
337 } else {
338 ERR("[rotation-thread] Unknown channel rotation pipe fd %d",
339 fd);
340 abort();
341 }
342
343 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
344 ret = lttng_poll_del(&state->events, fd);
345 if (ret) {
346 ERR("[rotation-thread] Failed to remove consumer "
347 "rotation pipe from poll set");
348 }
349 goto end;
350 }
351
352 do {
353 ret = read(fd, &key, sizeof(key));
354 } while (ret == -1 && errno == EINTR);
355 if (ret != sizeof(key)) {
356 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
357 fd);
358 ret = -1;
359 goto end;
360 }
361
362 DBG("[rotation-thread] Received notification for chan %" PRIu64
363 ", domain %d", key, domain);
364
365 channel_info = lookup_channel_pending(key, domain);
366 if (!channel_info) {
367 ERR("[rotation-thread] Failed to find channel_info (key = %"
368 PRIu64 ")", key);
369 ret = -1;
370 goto end;
371 }
372 rcu_read_lock();
373 session_lock_list();
374 session = session_find_by_id(channel_info->session_id);
375 if (!session) {
376 /*
377 * The session may have been destroyed before we had a chance to
378 * perform this action, return gracefully.
379 */
380 DBG("[rotation-thread] Session %" PRIu64 " not found",
381 channel_info->session_id);
382 ret = 0;
383 goto end_unlock_session_list;
384 }
385
386 session_lock(session);
387 if (--session->nr_chan_rotate_pending == 0) {
388 time_t now = time(NULL);
389
390 if (now == (time_t) -1) {
391 session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
392 ret = LTTNG_ERR_UNK;
393 goto end_unlock_session;
394 }
395
396 ret = rename_complete_chunk(session, now);
397 if (ret < 0) {
398 ERR("Failed to rename completed rotation chunk");
399 goto end_unlock_session;
400 }
401 session->rotate_pending = false;
402 session->last_chunk_start_ts = session->current_chunk_start_ts;
403 if (session->rotate_pending_relay) {
404 ret = sessiond_timer_rotate_pending_start(
405 session,
406 DEFAULT_ROTATE_PENDING_RELAY_TIMER);
407 if (ret) {
408 ERR("Failed to enable rotate pending timer");
409 ret = -1;
410 goto end_unlock_session;
411 }
412 } else {
413 struct lttng_trace_archive_location *location;
414
415 session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
416 /* Ownership of location is transferred. */
417 location = session_get_trace_archive_location(session);
418 ret = notification_thread_command_session_rotation_completed(
419 notification_thread_handle,
420 session->name,
421 session->uid,
422 session->gid,
423 session->current_archive_id,
424 location);
425 if (ret != LTTNG_OK) {
426 ERR("Failed to notify notification thread that rotation is complete for session %s",
427 session->name);
428 }
429
430 }
431 DBG("Rotation completed for session %s", session->name);
432 }
433
434 ret = 0;
435
436 end_unlock_session:
437 channel_rotation_info_destroy(channel_info);
438 session_unlock(session);
439 end_unlock_session_list:
440 session_unlock_list();
441 rcu_read_unlock();
442 end:
443 return ret;
444 }
445
446 /*
447 * Process the rotate_pending check, called with session lock held.
448 */
449 static
450 int rotate_pending_relay_timer(struct ltt_session *session)
451 {
452 int ret;
453
454 DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
455 session->id);
456 ret = relay_rotate_pending(session, session->current_archive_id - 1);
457 if (ret < 0) {
458 ERR("[rotation-thread] Check relay rotate pending");
459 goto end;
460 }
461 if (ret == 0) {
462 struct lttng_trace_archive_location *location;
463
464 DBG("[rotation-thread] Rotation completed on the relay for "
465 "session %" PRIu64, session->id);
466 /*
467 * Now we can clear the pending flag in the session. New
468 * rotations can start now.
469 */
470 session->rotate_pending_relay = false;
471 session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
472
473 session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
474 /* Ownership of location is transferred. */
475 location = session_get_trace_archive_location(session);
476 ret = notification_thread_command_session_rotation_completed(
477 notification_thread_handle,
478 session->name,
479 session->uid,
480 session->gid,
481 session->current_archive_id,
482 location);
483 if (ret != LTTNG_OK) {
484 ERR("Failed to notify notification thread that rotation is complete for session %s",
485 session->name);
486 }
487 } else if (ret == 1) {
488 DBG("[rotation-thread] Rotation still pending on the relay for "
489 "session %" PRIu64, session->id);
490 ret = sessiond_timer_rotate_pending_start(session,
491 DEFAULT_ROTATE_PENDING_RELAY_TIMER);
492 if (ret) {
493 ERR("Re-enabling rotate pending timer");
494 ret = -1;
495 goto end;
496 }
497 }
498
499 ret = 0;
500
501 end:
502 return ret;
503 }
504
505 /*
506 * Process the rotate_timer, called with session lock held.
507 */
508 static
509 int rotate_timer(struct ltt_session *session)
510 {
511 int ret;
512
513 /*
514 * Complete _at most_ one scheduled rotation on a stopped session.
515 */
516 if (!session->active && session->rotate_timer_enabled &&
517 session->rotated_after_last_stop) {
518 ret = 0;
519 goto end;
520 }
521
522 /* Ignore this timer if a rotation is already in progress. */
523 if (session->rotate_pending || session->rotate_pending_relay) {
524 ret = 0;
525 goto end;
526 }
527
528 DBG("[rotation-thread] Rotate timer on session %s", session->name);
529
530 ret = cmd_rotate_session(session, NULL);
531 if (ret == -LTTNG_ERR_ROTATION_PENDING) {
532 DBG("Scheduled rotation aborted since a rotation is already in progress");
533 ret = 0;
534 goto end;
535 } else if (ret != LTTNG_OK) {
536 ERR("[rotation-thread] Automatic time-triggered rotation failed with error code %i", ret);
537 ret = -1;
538 goto end;
539 }
540
541 ret = 0;
542
543 end:
544 return ret;
545 }
546
547 static
548 int handle_rotate_timer_pipe(uint32_t revents,
549 struct rotation_thread_handle *handle,
550 struct rotation_thread_state *state,
551 struct rotation_thread_timer_queue *queue)
552 {
553 int ret = 0;
554 int fd = lttng_pipe_get_readfd(queue->event_pipe);
555 struct ltt_session *session;
556 char buf[1];
557
558 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
559 ret = lttng_poll_del(&state->events, fd);
560 if (ret) {
561 ERR("[rotation-thread] Failed to remove consumer "
562 "rotate pending pipe from poll set");
563 }
564 goto end;
565 }
566
567 ret = lttng_read(fd, buf, 1);
568 if (ret != 1) {
569 ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
570 ret = -1;
571 goto end;
572 }
573
574 for (;;) {
575 struct sessiond_rotation_timer *timer_data;
576
577 /*
578 * Take the queue lock only to pop elements from the list.
579 */
580 pthread_mutex_lock(&queue->lock);
581 if (cds_list_empty(&queue->list)) {
582 pthread_mutex_unlock(&queue->lock);
583 break;
584 }
585 timer_data = cds_list_first_entry(&queue->list,
586 struct sessiond_rotation_timer, head);
587 cds_list_del(&timer_data->head);
588 pthread_mutex_unlock(&queue->lock);
589
590 /*
591 * session lock to lookup the session ID.
592 */
593 session_lock_list();
594 session = session_find_by_id(timer_data->session_id);
595 if (!session) {
596 DBG("[rotation-thread] Session %" PRIu64 " not found",
597 timer_data->session_id);
598 /*
599 * This is a non-fatal error, and we cannot report it to the
600 * user (timer), so just print the error and continue the
601 * processing.
602 */
603 session_unlock_list();
604 free(timer_data);
605 continue;
606 }
607
608 /*
609 * Take the session lock and release the session_list lock.
610 */
611 session_lock(session);
612 session_unlock_list();
613
614 if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
615 ret = rotate_pending_relay_timer(session);
616 } else if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_TIMER) {
617 ret = rotate_timer(session);
618 } else {
619 ERR("Unknown signal in rotate timer %d", timer_data->signal);
620 ret = -1;
621 }
622 session_unlock(session);
623 free(timer_data);
624 if (ret) {
625 ERR("Error processing timer");
626 goto end;
627 }
628 }
629
630 ret = 0;
631
632 end:
633 return ret;
634 }
635
636 int handle_condition(
637 const struct lttng_condition *condition,
638 const struct lttng_evaluation *evaluation,
639 struct notification_thread_handle *notification_thread_handle)
640 {
641 int ret = 0;
642 const char *condition_session_name = NULL;
643 enum lttng_condition_type condition_type;
644 enum lttng_condition_status condition_status;
645 enum lttng_evaluation_status evaluation_status;
646 uint64_t consumed;
647 struct ltt_session *session;
648
649 condition_type = lttng_condition_get_type(condition);
650
651 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
652 ret = -1;
653 ERR("[rotation-thread] Condition type and session usage type are not the same");
654 goto end;
655 }
656
657 /* Fetch info to test */
658 condition_status = lttng_condition_session_consumed_size_get_session_name(
659 condition, &condition_session_name);
660 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
661 ERR("[rotation-thread] Session name could not be fetched");
662 ret = -1;
663 goto end;
664 }
665 evaluation_status = lttng_evaluation_session_consumed_size_get_consumed_size(evaluation,
666 &consumed);
667 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
668 ERR("[rotation-thread] Failed to get evaluation");
669 ret = -1;
670 goto end;
671 }
672
673 session_lock_list();
674 session = session_find_by_name(condition_session_name);
675 if (!session) {
676 ret = -1;
677 session_unlock_list();
678 ERR("[rotation-thread] Session \"%s\" not found",
679 condition_session_name);
680 goto end;
681 }
682 session_lock(session);
683 session_unlock_list();
684
685 ret = unsubscribe_session_consumed_size_rotation(session,
686 notification_thread_handle);
687 if (ret) {
688 goto end;
689 }
690
691 ret = cmd_rotate_session(session, NULL);
692 if (ret == -LTTNG_ERR_ROTATION_PENDING) {
693 DBG("Rotate already pending, subscribe to the next threshold value");
694 } else if (ret != LTTNG_OK) {
695 ERR("[rotation-thread] Failed to rotate on size notification with error: %s",
696 lttng_strerror(ret));
697 ret = -1;
698 goto end_unlock;
699 }
700 ret = subscribe_session_consumed_size_rotation(session,
701 consumed + session->rotate_size,
702 notification_thread_handle);
703 if (ret) {
704 ERR("[rotation-thread] Failed to subscribe to session consumed size condition");
705 goto end_unlock;
706 }
707 ret = 0;
708
709 end_unlock:
710 session_unlock(session);
711 end:
712 return ret;
713 }
714
715 static
716 int handle_notification_channel(int fd, uint32_t revents,
717 struct rotation_thread_handle *handle,
718 struct rotation_thread_state *state)
719 {
720 int ret;
721 bool notification_pending;
722 struct lttng_notification *notification = NULL;
723 enum lttng_notification_channel_status status;
724 const struct lttng_evaluation *notification_evaluation;
725 const struct lttng_condition *notification_condition;
726
727 status = lttng_notification_channel_has_pending_notification(
728 rotate_notification_channel, &notification_pending);
729 if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
730 ERR("[rotation-thread ]Error occured while checking for pending notification");
731 ret = -1;
732 goto end;
733 }
734
735 if (!notification_pending) {
736 ret = 0;
737 goto end;
738 }
739
740 /* Receive the next notification. */
741 status = lttng_notification_channel_get_next_notification(
742 rotate_notification_channel,
743 &notification);
744
745 switch (status) {
746 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
747 break;
748 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
749 /* Not an error, we will wait for the next one */
750 ret = 0;
751 goto end;;
752 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
753 ERR("Notification channel was closed");
754 ret = -1;
755 goto end;
756 default:
757 /* Unhandled conditions / errors. */
758 ERR("Unknown notification channel status");
759 ret = -1;
760 goto end;
761 }
762
763 notification_condition = lttng_notification_get_condition(notification);
764 notification_evaluation = lttng_notification_get_evaluation(notification);
765
766 ret = handle_condition(notification_condition, notification_evaluation,
767 handle->notification_thread_handle);
768
769 end:
770 lttng_notification_destroy(notification);
771 return ret;
772 }
773
774 void *thread_rotation(void *data)
775 {
776 int ret;
777 struct rotation_thread_handle *handle = data;
778 struct rotation_thread_state state;
779
780 DBG("[rotation-thread] Started rotation thread");
781
782 if (!handle) {
783 ERR("[rotation-thread] Invalid thread context provided");
784 goto end;
785 }
786
787 rcu_register_thread();
788 rcu_thread_online();
789
790 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
791 health_code_update();
792
793 ret = init_thread_state(handle, &state);
794 if (ret) {
795 goto end;
796 }
797
798 /* Ready to handle client connections. */
799 sessiond_notify_ready();
800
801 while (true) {
802 int fd_count, i;
803
804 health_poll_entry();
805 DBG("[rotation-thread] Entering poll wait");
806 ret = lttng_poll_wait(&state.events, -1);
807 DBG("[rotation-thread] Poll wait returned (%i)", ret);
808 health_poll_exit();
809 if (ret < 0) {
810 /*
811 * Restart interrupted system call.
812 */
813 if (errno == EINTR) {
814 continue;
815 }
816 ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret);
817 goto error;
818 }
819
820 fd_count = ret;
821 for (i = 0; i < fd_count; i++) {
822 int fd = LTTNG_POLL_GETFD(&state.events, i);
823 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
824
825 DBG("[rotation-thread] Handling fd (%i) activity (%u)",
826 fd, revents);
827
828 if (fd == handle->thread_quit_pipe) {
829 DBG("[rotation-thread] Quit pipe activity");
830 goto exit;
831 } else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) {
832 ret = handle_rotate_timer_pipe(revents,
833 handle, &state, handle->rotation_timer_queue);
834 if (ret) {
835 ERR("[rotation-thread] Failed to handle rotation timer pipe event");
836 goto error;
837 }
838 } else if (fd == handle->ust32_consumer ||
839 fd == handle->ust64_consumer ||
840 fd == handle->kernel_consumer) {
841 ret = handle_channel_rotation_pipe(fd,
842 revents, handle, &state);
843 if (ret) {
844 ERR("[rotation-thread] Failed to handle channel rotation pipe");
845 goto error;
846 }
847 } else if (fd == rotate_notification_channel->socket) {
848 ret = handle_notification_channel(fd, revents,
849 handle, &state);
850 if (ret) {
851 ERR("[rotation-thread] Error occured while handling activity on notification channel socket");
852 goto error;
853 }
854 }
855 }
856 }
857 exit:
858 error:
859 DBG("[rotation-thread] Exit");
860 fini_thread_state(&state);
861 health_unregister(health_sessiond);
862 rcu_thread_offline();
863 rcu_unregister_thread();
864 end:
865 return NULL;
866 }
This page took 0.057403 seconds and 4 git commands to generate.