2 * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 #include <lttng/trigger/trigger.h>
20 #include <common/error.h>
21 #include <common/config/session-config.h>
22 #include <common/defaults.h>
23 #include <common/utils.h>
24 #include <common/futex.h>
25 #include <common/align.h>
26 #include <common/time.h>
27 #include <common/hashtable/utils.h>
28 #include <sys/eventfd.h>
34 #include <common/kernel-ctl/kernel-ctl.h>
35 #include <lttng/notification/channel-internal.h>
36 #include <lttng/rotate-internal.h>
38 #include "rotation-thread.h"
39 #include "lttng-sessiond.h"
40 #include "health-sessiond.h"
44 #include "sessiond-timer.h"
45 #include "notification-thread-commands.h"
48 #include <urcu/list.h>
49 #include <urcu/rculfhash.h>
52 * Store a struct rotation_channel_info for each channel that is currently
53 * being rotated by the consumer.
55 struct cds_lfht
*channel_pending_rotate_ht
;
57 struct lttng_notification_channel
*rotate_notification_channel
= NULL
;
59 struct rotation_thread_state
{
60 struct lttng_poll_event events
;
64 void channel_rotation_info_destroy(struct rotation_channel_info
*channel_info
)
71 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
73 struct rotation_channel_key
*channel_key
= (struct rotation_channel_key
*) key
;
74 struct rotation_channel_info
*channel_info
;
76 channel_info
= caa_container_of(node
, struct rotation_channel_info
,
77 rotate_channels_ht_node
);
79 return !!((channel_key
->key
== channel_info
->channel_key
.key
) &&
80 (channel_key
->domain
== channel_info
->channel_key
.domain
));
84 struct rotation_channel_info
*lookup_channel_pending(uint64_t key
,
85 enum lttng_domain_type domain
)
87 struct cds_lfht_iter iter
;
88 struct cds_lfht_node
*node
;
89 struct rotation_channel_info
*channel_info
= NULL
;
90 struct rotation_channel_key channel_key
= { .key
= key
,
93 cds_lfht_lookup(channel_pending_rotate_ht
,
94 hash_channel_key(&channel_key
),
97 node
= cds_lfht_iter_get_node(&iter
);
102 channel_info
= caa_container_of(node
, struct rotation_channel_info
,
103 rotate_channels_ht_node
);
104 cds_lfht_del(channel_pending_rotate_ht
, node
);
110 * Destroy the thread data previously created by the init function.
112 void rotation_thread_handle_destroy(
113 struct rotation_thread_handle
*handle
)
121 if (handle
->ust32_consumer
>= 0) {
122 ret
= close(handle
->ust32_consumer
);
124 PERROR("close 32-bit consumer channel rotation pipe");
127 if (handle
->ust64_consumer
>= 0) {
128 ret
= close(handle
->ust64_consumer
);
130 PERROR("close 64-bit consumer channel rotation pipe");
133 if (handle
->kernel_consumer
>= 0) {
134 ret
= close(handle
->kernel_consumer
);
136 PERROR("close kernel consumer channel rotation pipe");
144 struct rotation_thread_handle
*rotation_thread_handle_create(
145 struct lttng_pipe
*ust32_channel_rotate_pipe
,
146 struct lttng_pipe
*ust64_channel_rotate_pipe
,
147 struct lttng_pipe
*kernel_channel_rotate_pipe
,
148 int thread_quit_pipe
,
149 struct rotation_thread_timer_queue
*rotation_timer_queue
,
150 struct notification_thread_handle
*notification_thread_handle
,
151 sem_t
*notification_thread_ready
)
153 struct rotation_thread_handle
*handle
;
155 handle
= zmalloc(sizeof(*handle
));
160 if (ust32_channel_rotate_pipe
) {
161 handle
->ust32_consumer
=
162 lttng_pipe_release_readfd(
163 ust32_channel_rotate_pipe
);
164 if (handle
->ust32_consumer
< 0) {
168 handle
->ust32_consumer
= -1;
170 if (ust64_channel_rotate_pipe
) {
171 handle
->ust64_consumer
=
172 lttng_pipe_release_readfd(
173 ust64_channel_rotate_pipe
);
174 if (handle
->ust64_consumer
< 0) {
178 handle
->ust64_consumer
= -1;
180 if (kernel_channel_rotate_pipe
) {
181 handle
->kernel_consumer
=
182 lttng_pipe_release_readfd(
183 kernel_channel_rotate_pipe
);
184 if (handle
->kernel_consumer
< 0) {
188 handle
->kernel_consumer
= -1;
190 handle
->thread_quit_pipe
= thread_quit_pipe
;
191 handle
->rotation_timer_queue
= rotation_timer_queue
;
192 handle
->notification_thread_handle
= notification_thread_handle
;
193 handle
->notification_thread_ready
= notification_thread_ready
;
198 rotation_thread_handle_destroy(handle
);
203 int init_poll_set(struct lttng_poll_event
*poll_set
,
204 struct rotation_thread_handle
*handle
)
209 * Create pollset with size 5:
210 * - sessiond quit pipe
211 * - sessiond timer pipe,
212 * - consumerd (32-bit user space) channel rotate pipe,
213 * - consumerd (64-bit user space) channel rotate pipe,
214 * - consumerd (kernel) channel rotate pipe,
216 ret
= lttng_poll_create(poll_set
, 5, LTTNG_CLOEXEC
);
221 ret
= lttng_poll_add(poll_set
, handle
->thread_quit_pipe
,
224 ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
227 ret
= lttng_poll_add(poll_set
,
228 lttng_pipe_get_readfd(handle
->rotation_timer_queue
->event_pipe
),
231 ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
234 ret
= lttng_poll_add(poll_set
, handle
->ust32_consumer
,
237 ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
240 ret
= lttng_poll_add(poll_set
, handle
->ust64_consumer
,
243 ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
246 if (handle
->kernel_consumer
>= 0) {
247 ret
= lttng_poll_add(poll_set
, handle
->kernel_consumer
,
250 ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
258 lttng_poll_clean(poll_set
);
263 void fini_thread_state(struct rotation_thread_state
*state
)
267 lttng_poll_clean(&state
->events
);
268 ret
= cds_lfht_destroy(channel_pending_rotate_ht
, NULL
);
270 if (rotate_notification_channel
) {
271 lttng_notification_channel_destroy(rotate_notification_channel
);
276 int init_thread_state(struct rotation_thread_handle
*handle
,
277 struct rotation_thread_state
*state
)
281 memset(state
, 0, sizeof(*state
));
282 lttng_poll_init(&state
->events
);
284 ret
= init_poll_set(&state
->events
, handle
);
286 ERR("[rotation-thread] Failed to initialize rotation thread poll set");
290 channel_pending_rotate_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
291 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
292 if (!channel_pending_rotate_ht
) {
293 ERR("[rotation-thread] Failed to create channel pending rotation hash table");
299 * We wait until the notification thread is ready to create the
300 * notification channel and add it to the poll_set.
302 sem_wait(handle
->notification_thread_ready
);
303 rotate_notification_channel
= lttng_notification_channel_create(
304 lttng_session_daemon_notification_endpoint
);
305 if (!rotate_notification_channel
) {
306 ERR("[rotation-thread] Could not create notification channel");
310 ret
= lttng_poll_add(&state
->events
, rotate_notification_channel
->socket
,
313 ERR("[rotation-thread] Failed to add notification fd to pollset");
322 int handle_channel_rotation_pipe(int fd
, uint32_t revents
,
323 struct rotation_thread_handle
*handle
,
324 struct rotation_thread_state
*state
)
327 enum lttng_domain_type domain
;
328 struct rotation_channel_info
*channel_info
;
329 struct ltt_session
*session
= NULL
;
332 if (fd
== handle
->ust32_consumer
||
333 fd
== handle
->ust64_consumer
) {
334 domain
= LTTNG_DOMAIN_UST
;
335 } else if (fd
== handle
->kernel_consumer
) {
336 domain
= LTTNG_DOMAIN_KERNEL
;
338 ERR("[rotation-thread] Unknown channel rotation pipe fd %d",
343 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
344 ret
= lttng_poll_del(&state
->events
, fd
);
346 ERR("[rotation-thread] Failed to remove consumer "
347 "rotation pipe from poll set");
353 ret
= read(fd
, &key
, sizeof(key
));
354 } while (ret
== -1 && errno
== EINTR
);
355 if (ret
!= sizeof(key
)) {
356 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
362 DBG("[rotation-thread] Received notification for chan %" PRIu64
363 ", domain %d", key
, domain
);
365 channel_info
= lookup_channel_pending(key
, domain
);
367 ERR("[rotation-thread] Failed to find channel_info (key = %"
374 session
= session_find_by_id(channel_info
->session_id
);
377 * The session may have been destroyed before we had a chance to
378 * perform this action, return gracefully.
380 DBG("[rotation-thread] Session %" PRIu64
" not found",
381 channel_info
->session_id
);
383 goto end_unlock_session_list
;
386 session_lock(session
);
387 if (--session
->nr_chan_rotate_pending
== 0) {
388 time_t now
= time(NULL
);
390 if (now
== (time_t) -1) {
391 session
->rotation_state
= LTTNG_ROTATION_STATE_ERROR
;
393 goto end_unlock_session
;
396 ret
= rename_complete_chunk(session
, now
);
398 ERR("Failed to rename completed rotation chunk");
399 goto end_unlock_session
;
401 session
->rotate_pending
= false;
402 session
->last_chunk_start_ts
= session
->current_chunk_start_ts
;
403 if (session
->rotate_pending_relay
) {
404 ret
= sessiond_timer_rotate_pending_start(
406 DEFAULT_ROTATE_PENDING_RELAY_TIMER
);
408 ERR("Failed to enable rotate pending timer");
410 goto end_unlock_session
;
413 struct lttng_trace_archive_location
*location
;
415 session
->rotation_state
= LTTNG_ROTATION_STATE_COMPLETED
;
416 /* Ownership of location is transferred. */
417 location
= session_get_trace_archive_location(session
);
418 ret
= notification_thread_command_session_rotation_completed(
419 notification_thread_handle
,
423 session
->current_archive_id
,
425 if (ret
!= LTTNG_OK
) {
426 ERR("Failed to notify notification thread that rotation is complete for session %s",
431 DBG("Rotation completed for session %s", session
->name
);
437 channel_rotation_info_destroy(channel_info
);
438 session_unlock(session
);
439 end_unlock_session_list
:
440 session_unlock_list();
447 * Process the rotate_pending check, called with session lock held.
450 int rotate_pending_relay_timer(struct ltt_session
*session
)
454 DBG("[rotation-thread] Check rotate pending on session %" PRIu64
,
456 ret
= relay_rotate_pending(session
, session
->current_archive_id
- 1);
458 ERR("[rotation-thread] Check relay rotate pending");
462 struct lttng_trace_archive_location
*location
;
464 DBG("[rotation-thread] Rotation completed on the relay for "
465 "session %" PRIu64
, session
->id
);
467 * Now we can clear the pending flag in the session. New
468 * rotations can start now.
470 session
->rotate_pending_relay
= false;
471 session
->rotation_state
= LTTNG_ROTATION_STATE_COMPLETED
;
473 session
->rotation_state
= LTTNG_ROTATION_STATE_COMPLETED
;
474 /* Ownership of location is transferred. */
475 location
= session_get_trace_archive_location(session
);
476 ret
= notification_thread_command_session_rotation_completed(
477 notification_thread_handle
,
481 session
->current_archive_id
,
483 if (ret
!= LTTNG_OK
) {
484 ERR("Failed to notify notification thread that rotation is complete for session %s",
487 } else if (ret
== 1) {
488 DBG("[rotation-thread] Rotation still pending on the relay for "
489 "session %" PRIu64
, session
->id
);
490 ret
= sessiond_timer_rotate_pending_start(session
,
491 DEFAULT_ROTATE_PENDING_RELAY_TIMER
);
493 ERR("Re-enabling rotate pending timer");
506 * Process the rotate_timer, called with session lock held.
509 int rotate_timer(struct ltt_session
*session
)
514 * Complete _at most_ one scheduled rotation on a stopped session.
516 if (!session
->active
&& session
->rotate_timer_enabled
&&
517 session
->rotated_after_last_stop
) {
522 /* Ignore this timer if a rotation is already in progress. */
523 if (session
->rotate_pending
|| session
->rotate_pending_relay
) {
528 DBG("[rotation-thread] Rotate timer on session %s", session
->name
);
530 ret
= cmd_rotate_session(session
, NULL
);
531 if (ret
== -LTTNG_ERR_ROTATION_PENDING
) {
532 DBG("Scheduled rotation aborted since a rotation is already in progress");
535 } else if (ret
!= LTTNG_OK
) {
536 ERR("[rotation-thread] Automatic time-triggered rotation failed with error code %i", ret
);
548 int handle_rotate_timer_pipe(uint32_t revents
,
549 struct rotation_thread_handle
*handle
,
550 struct rotation_thread_state
*state
,
551 struct rotation_thread_timer_queue
*queue
)
554 int fd
= lttng_pipe_get_readfd(queue
->event_pipe
);
555 struct ltt_session
*session
;
558 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
559 ret
= lttng_poll_del(&state
->events
, fd
);
561 ERR("[rotation-thread] Failed to remove consumer "
562 "rotate pending pipe from poll set");
567 ret
= lttng_read(fd
, buf
, 1);
569 ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd
);
575 struct sessiond_rotation_timer
*timer_data
;
578 * Take the queue lock only to pop elements from the list.
580 pthread_mutex_lock(&queue
->lock
);
581 if (cds_list_empty(&queue
->list
)) {
582 pthread_mutex_unlock(&queue
->lock
);
585 timer_data
= cds_list_first_entry(&queue
->list
,
586 struct sessiond_rotation_timer
, head
);
587 cds_list_del(&timer_data
->head
);
588 pthread_mutex_unlock(&queue
->lock
);
591 * session lock to lookup the session ID.
594 session
= session_find_by_id(timer_data
->session_id
);
596 DBG("[rotation-thread] Session %" PRIu64
" not found",
597 timer_data
->session_id
);
599 * This is a non-fatal error, and we cannot report it to the
600 * user (timer), so just print the error and continue the
603 session_unlock_list();
609 * Take the session lock and release the session_list lock.
611 session_lock(session
);
612 session_unlock_list();
614 if (timer_data
->signal
== LTTNG_SESSIOND_SIG_ROTATE_PENDING
) {
615 ret
= rotate_pending_relay_timer(session
);
616 } else if (timer_data
->signal
== LTTNG_SESSIOND_SIG_ROTATE_TIMER
) {
617 ret
= rotate_timer(session
);
619 ERR("Unknown signal in rotate timer %d", timer_data
->signal
);
622 session_unlock(session
);
625 ERR("Error processing timer");
636 int handle_condition(
637 const struct lttng_condition
*condition
,
638 const struct lttng_evaluation
*evaluation
,
639 struct notification_thread_handle
*notification_thread_handle
)
642 const char *condition_session_name
= NULL
;
643 enum lttng_condition_type condition_type
;
644 enum lttng_condition_status condition_status
;
645 enum lttng_evaluation_status evaluation_status
;
647 struct ltt_session
*session
;
649 condition_type
= lttng_condition_get_type(condition
);
651 if (condition_type
!= LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
) {
653 ERR("[rotation-thread] Condition type and session usage type are not the same");
657 /* Fetch info to test */
658 condition_status
= lttng_condition_session_consumed_size_get_session_name(
659 condition
, &condition_session_name
);
660 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
661 ERR("[rotation-thread] Session name could not be fetched");
665 evaluation_status
= lttng_evaluation_session_consumed_size_get_consumed_size(evaluation
,
667 if (evaluation_status
!= LTTNG_EVALUATION_STATUS_OK
) {
668 ERR("[rotation-thread] Failed to get evaluation");
674 session
= session_find_by_name(condition_session_name
);
677 session_unlock_list();
678 ERR("[rotation-thread] Session \"%s\" not found",
679 condition_session_name
);
682 session_lock(session
);
683 session_unlock_list();
685 ret
= unsubscribe_session_consumed_size_rotation(session
,
686 notification_thread_handle
);
691 ret
= cmd_rotate_session(session
, NULL
);
692 if (ret
== -LTTNG_ERR_ROTATION_PENDING
) {
693 DBG("Rotate already pending, subscribe to the next threshold value");
694 } else if (ret
!= LTTNG_OK
) {
695 ERR("[rotation-thread] Failed to rotate on size notification with error: %s",
696 lttng_strerror(ret
));
700 ret
= subscribe_session_consumed_size_rotation(session
,
701 consumed
+ session
->rotate_size
,
702 notification_thread_handle
);
704 ERR("[rotation-thread] Failed to subscribe to session consumed size condition");
710 session_unlock(session
);
716 int handle_notification_channel(int fd
, uint32_t revents
,
717 struct rotation_thread_handle
*handle
,
718 struct rotation_thread_state
*state
)
721 bool notification_pending
;
722 struct lttng_notification
*notification
= NULL
;
723 enum lttng_notification_channel_status status
;
724 const struct lttng_evaluation
*notification_evaluation
;
725 const struct lttng_condition
*notification_condition
;
727 status
= lttng_notification_channel_has_pending_notification(
728 rotate_notification_channel
, ¬ification_pending
);
729 if (status
!= LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
) {
730 ERR("[rotation-thread ]Error occured while checking for pending notification");
735 if (!notification_pending
) {
740 /* Receive the next notification. */
741 status
= lttng_notification_channel_get_next_notification(
742 rotate_notification_channel
,
746 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
:
748 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
:
749 /* Not an error, we will wait for the next one */
752 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
:
753 ERR("Notification channel was closed");
757 /* Unhandled conditions / errors. */
758 ERR("Unknown notification channel status");
763 notification_condition
= lttng_notification_get_condition(notification
);
764 notification_evaluation
= lttng_notification_get_evaluation(notification
);
766 ret
= handle_condition(notification_condition
, notification_evaluation
,
767 handle
->notification_thread_handle
);
770 lttng_notification_destroy(notification
);
774 void *thread_rotation(void *data
)
777 struct rotation_thread_handle
*handle
= data
;
778 struct rotation_thread_state state
;
780 DBG("[rotation-thread] Started rotation thread");
783 ERR("[rotation-thread] Invalid thread context provided");
787 rcu_register_thread();
790 health_register(health_sessiond
, HEALTH_SESSIOND_TYPE_ROTATION
);
791 health_code_update();
793 ret
= init_thread_state(handle
, &state
);
798 /* Ready to handle client connections. */
799 sessiond_notify_ready();
805 DBG("[rotation-thread] Entering poll wait");
806 ret
= lttng_poll_wait(&state
.events
, -1);
807 DBG("[rotation-thread] Poll wait returned (%i)", ret
);
811 * Restart interrupted system call.
813 if (errno
== EINTR
) {
816 ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret
);
821 for (i
= 0; i
< fd_count
; i
++) {
822 int fd
= LTTNG_POLL_GETFD(&state
.events
, i
);
823 uint32_t revents
= LTTNG_POLL_GETEV(&state
.events
, i
);
825 DBG("[rotation-thread] Handling fd (%i) activity (%u)",
828 if (fd
== handle
->thread_quit_pipe
) {
829 DBG("[rotation-thread] Quit pipe activity");
831 } else if (fd
== lttng_pipe_get_readfd(handle
->rotation_timer_queue
->event_pipe
)) {
832 ret
= handle_rotate_timer_pipe(revents
,
833 handle
, &state
, handle
->rotation_timer_queue
);
835 ERR("[rotation-thread] Failed to handle rotation timer pipe event");
838 } else if (fd
== handle
->ust32_consumer
||
839 fd
== handle
->ust64_consumer
||
840 fd
== handle
->kernel_consumer
) {
841 ret
= handle_channel_rotation_pipe(fd
,
842 revents
, handle
, &state
);
844 ERR("[rotation-thread] Failed to handle channel rotation pipe");
847 } else if (fd
== rotate_notification_channel
->socket
) {
848 ret
= handle_notification_channel(fd
, revents
,
851 ERR("[rotation-thread] Error occured while handling activity on notification channel socket");
859 DBG("[rotation-thread] Exit");
860 fini_thread_state(&state
);
861 health_unregister(health_sessiond
);
862 rcu_thread_offline();
863 rcu_unregister_thread();