2 * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 #include <lttng/trigger/trigger.h>
20 #include <common/error.h>
21 #include <common/config/session-config.h>
22 #include <common/defaults.h>
23 #include <common/utils.h>
24 #include <common/futex.h>
25 #include <common/align.h>
26 #include <common/time.h>
27 #include <common/hashtable/utils.h>
28 #include <sys/eventfd.h>
34 #include <common/kernel-ctl/kernel-ctl.h>
35 #include <lttng/notification/channel-internal.h>
36 #include <lttng/rotate-internal.h>
38 #include "rotation-thread.h"
39 #include "lttng-sessiond.h"
40 #include "health-sessiond.h"
44 #include "sessiond-timer.h"
47 #include <urcu/list.h>
48 #include <urcu/rculfhash.h>
51 * Store a struct rotation_channel_info for each channel that is currently
52 * being rotated by the consumer.
54 struct cds_lfht
*channel_pending_rotate_ht
;
56 struct lttng_notification_channel
*rotate_notification_channel
= NULL
;
58 struct rotation_thread_state
{
59 struct lttng_poll_event events
;
63 void channel_rotation_info_destroy(struct rotation_channel_info
*channel_info
)
70 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
72 struct rotation_channel_key
*channel_key
= (struct rotation_channel_key
*) key
;
73 struct rotation_channel_info
*channel_info
;
75 channel_info
= caa_container_of(node
, struct rotation_channel_info
,
76 rotate_channels_ht_node
);
78 return !!((channel_key
->key
== channel_info
->channel_key
.key
) &&
79 (channel_key
->domain
== channel_info
->channel_key
.domain
));
83 struct rotation_channel_info
*lookup_channel_pending(uint64_t key
,
84 enum lttng_domain_type domain
)
86 struct cds_lfht_iter iter
;
87 struct cds_lfht_node
*node
;
88 struct rotation_channel_info
*channel_info
= NULL
;
89 struct rotation_channel_key channel_key
= { .key
= key
,
92 cds_lfht_lookup(channel_pending_rotate_ht
,
93 hash_channel_key(&channel_key
),
96 node
= cds_lfht_iter_get_node(&iter
);
101 channel_info
= caa_container_of(node
, struct rotation_channel_info
,
102 rotate_channels_ht_node
);
103 cds_lfht_del(channel_pending_rotate_ht
, node
);
109 * Destroy the thread data previously created by the init function.
111 void rotation_thread_handle_destroy(
112 struct rotation_thread_handle
*handle
)
120 if (handle
->ust32_consumer
>= 0) {
121 ret
= close(handle
->ust32_consumer
);
123 PERROR("close 32-bit consumer channel rotation pipe");
126 if (handle
->ust64_consumer
>= 0) {
127 ret
= close(handle
->ust64_consumer
);
129 PERROR("close 64-bit consumer channel rotation pipe");
132 if (handle
->kernel_consumer
>= 0) {
133 ret
= close(handle
->kernel_consumer
);
135 PERROR("close kernel consumer channel rotation pipe");
143 struct rotation_thread_handle
*rotation_thread_handle_create(
144 struct lttng_pipe
*ust32_channel_rotate_pipe
,
145 struct lttng_pipe
*ust64_channel_rotate_pipe
,
146 struct lttng_pipe
*kernel_channel_rotate_pipe
,
147 int thread_quit_pipe
,
148 struct rotation_thread_timer_queue
*rotation_timer_queue
,
149 struct notification_thread_handle
*notification_thread_handle
,
150 sem_t
*notification_thread_ready
)
152 struct rotation_thread_handle
*handle
;
154 handle
= zmalloc(sizeof(*handle
));
159 if (ust32_channel_rotate_pipe
) {
160 handle
->ust32_consumer
=
161 lttng_pipe_release_readfd(
162 ust32_channel_rotate_pipe
);
163 if (handle
->ust32_consumer
< 0) {
167 handle
->ust32_consumer
= -1;
169 if (ust64_channel_rotate_pipe
) {
170 handle
->ust64_consumer
=
171 lttng_pipe_release_readfd(
172 ust64_channel_rotate_pipe
);
173 if (handle
->ust64_consumer
< 0) {
177 handle
->ust64_consumer
= -1;
179 if (kernel_channel_rotate_pipe
) {
180 handle
->kernel_consumer
=
181 lttng_pipe_release_readfd(
182 kernel_channel_rotate_pipe
);
183 if (handle
->kernel_consumer
< 0) {
187 handle
->kernel_consumer
= -1;
189 handle
->thread_quit_pipe
= thread_quit_pipe
;
190 handle
->rotation_timer_queue
= rotation_timer_queue
;
191 handle
->notification_thread_handle
= notification_thread_handle
;
192 handle
->notification_thread_ready
= notification_thread_ready
;
197 rotation_thread_handle_destroy(handle
);
202 int init_poll_set(struct lttng_poll_event
*poll_set
,
203 struct rotation_thread_handle
*handle
)
208 * Create pollset with size 5:
209 * - sessiond quit pipe
210 * - sessiond timer pipe,
211 * - consumerd (32-bit user space) channel rotate pipe,
212 * - consumerd (64-bit user space) channel rotate pipe,
213 * - consumerd (kernel) channel rotate pipe,
215 ret
= lttng_poll_create(poll_set
, 5, LTTNG_CLOEXEC
);
220 ret
= lttng_poll_add(poll_set
, handle
->thread_quit_pipe
,
223 ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
226 ret
= lttng_poll_add(poll_set
,
227 lttng_pipe_get_readfd(handle
->rotation_timer_queue
->event_pipe
),
230 ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
233 ret
= lttng_poll_add(poll_set
, handle
->ust32_consumer
,
236 ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
239 ret
= lttng_poll_add(poll_set
, handle
->ust64_consumer
,
242 ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
245 if (handle
->kernel_consumer
>= 0) {
246 ret
= lttng_poll_add(poll_set
, handle
->kernel_consumer
,
249 ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
257 lttng_poll_clean(poll_set
);
262 void fini_thread_state(struct rotation_thread_state
*state
)
264 lttng_poll_clean(&state
->events
);
265 cds_lfht_destroy(channel_pending_rotate_ht
, NULL
);
266 if (rotate_notification_channel
) {
267 lttng_notification_channel_destroy(rotate_notification_channel
);
272 int init_thread_state(struct rotation_thread_handle
*handle
,
273 struct rotation_thread_state
*state
)
277 memset(state
, 0, sizeof(*state
));
278 lttng_poll_init(&state
->events
);
280 ret
= init_poll_set(&state
->events
, handle
);
282 ERR("[rotation-thread] Failed to initialize rotation thread poll set");
286 channel_pending_rotate_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
287 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
288 if (!channel_pending_rotate_ht
) {
289 ERR("[rotation-thread] Failed to create channel pending rotation hash table");
295 * We wait until the notification thread is ready to create the
296 * notification channel and add it to the poll_set.
298 sem_wait(handle
->notification_thread_ready
);
299 rotate_notification_channel
= lttng_notification_channel_create(
300 lttng_session_daemon_notification_endpoint
);
301 if (!rotate_notification_channel
) {
302 ERR("[rotation-thread] Could not create notification channel");
306 ret
= lttng_poll_add(&state
->events
, rotate_notification_channel
->socket
,
309 ERR("[rotation-thread] Failed to add notification fd to pollset");
318 int handle_channel_rotation_pipe(int fd
, uint32_t revents
,
319 struct rotation_thread_handle
*handle
,
320 struct rotation_thread_state
*state
)
323 enum lttng_domain_type domain
;
324 struct rotation_channel_info
*channel_info
;
325 struct ltt_session
*session
= NULL
;
328 if (fd
== handle
->ust32_consumer
||
329 fd
== handle
->ust64_consumer
) {
330 domain
= LTTNG_DOMAIN_UST
;
331 } else if (fd
== handle
->kernel_consumer
) {
332 domain
= LTTNG_DOMAIN_KERNEL
;
334 ERR("[rotation-thread] Unknown channel rotation pipe fd %d",
339 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
340 ret
= lttng_poll_del(&state
->events
, fd
);
342 ERR("[rotation-thread] Failed to remove consumer "
343 "rotation pipe from poll set");
349 ret
= read(fd
, &key
, sizeof(key
));
350 } while (ret
== -1 && errno
== EINTR
);
351 if (ret
!= sizeof(key
)) {
352 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
358 DBG("[rotation-thread] Received notification for chan %" PRIu64
359 ", domain %d", key
, domain
);
361 channel_info
= lookup_channel_pending(key
, domain
);
363 ERR("[rotation-thread] Failed to find channel_info (key = %"
370 session
= session_find_by_id(channel_info
->session_id
);
373 * The session may have been destroyed before we had a chance to
374 * perform this action, return gracefully.
376 DBG("[rotation-thread] Session %" PRIu64
" not found",
377 channel_info
->session_id
);
379 goto end_unlock_session_list
;
382 session_lock(session
);
383 if (--session
->nr_chan_rotate_pending
== 0) {
384 time_t now
= time(NULL
);
386 if (now
== (time_t) -1) {
387 session
->rotation_state
= LTTNG_ROTATION_STATE_ERROR
;
389 goto end_unlock_session
;
392 ret
= rename_complete_chunk(session
, now
);
394 ERR("Failed to rename completed rotation chunk");
395 goto end_unlock_session
;
397 session
->rotate_pending
= false;
398 session
->last_chunk_start_ts
= session
->current_chunk_start_ts
;
399 if (session
->rotate_pending_relay
) {
400 ret
= sessiond_timer_rotate_pending_start(
402 DEFAULT_ROTATE_PENDING_RELAY_TIMER
);
404 ERR("Failed to enable rotate pending timer");
406 goto end_unlock_session
;
409 session
->rotation_state
= LTTNG_ROTATION_STATE_COMPLETED
;
411 DBG("Rotation completed for session %s", session
->name
);
417 channel_rotation_info_destroy(channel_info
);
418 session_unlock(session
);
419 end_unlock_session_list
:
420 session_unlock_list();
427 * Process the rotate_pending check, called with session lock held.
430 int rotate_pending_relay_timer(struct ltt_session
*session
)
434 DBG("[rotation-thread] Check rotate pending on session %" PRIu64
,
436 ret
= relay_rotate_pending(session
, session
->rotate_count
- 1);
438 ERR("[rotation-thread] Check relay rotate pending");
442 DBG("[rotation-thread] Rotation completed on the relay for "
443 "session %" PRIu64
, session
->id
);
445 * Now we can clear the pending flag in the session. New
446 * rotations can start now.
448 session
->rotate_pending_relay
= false;
449 session
->rotation_state
= LTTNG_ROTATION_STATE_COMPLETED
;
450 } else if (ret
== 1) {
451 DBG("[rotation-thread] Rotation still pending on the relay for "
452 "session %" PRIu64
, session
->id
);
453 ret
= sessiond_timer_rotate_pending_start(session
,
454 DEFAULT_ROTATE_PENDING_RELAY_TIMER
);
456 ERR("Re-enabling rotate pending timer");
469 * Process the rotate_timer, called with session lock held.
472 int rotate_timer(struct ltt_session
*session
)
477 * Complete _at most_ one scheduled rotation on a stopped session.
479 if (!session
->active
&& session
->rotate_timer_enabled
&&
480 session
->rotated_after_last_stop
) {
485 /* Ignore this timer if a rotation is already in progress. */
486 if (session
->rotate_pending
|| session
->rotate_pending_relay
) {
491 DBG("[rotation-thread] Rotate timer on session %s", session
->name
);
493 ret
= cmd_rotate_session(session
, NULL
);
494 if (ret
== -LTTNG_ERR_ROTATION_PENDING
) {
495 DBG("Scheduled rotation aborted since a rotation is already in progress");
498 } else if (ret
!= LTTNG_OK
) {
499 ERR("[rotation-thread] Automatic time-triggered rotation failed with error code %i", ret
);
511 int handle_rotate_timer_pipe(uint32_t revents
,
512 struct rotation_thread_handle
*handle
,
513 struct rotation_thread_state
*state
,
514 struct rotation_thread_timer_queue
*queue
)
517 int fd
= lttng_pipe_get_readfd(queue
->event_pipe
);
518 struct ltt_session
*session
;
521 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
522 ret
= lttng_poll_del(&state
->events
, fd
);
524 ERR("[rotation-thread] Failed to remove consumer "
525 "rotate pending pipe from poll set");
530 ret
= lttng_read(fd
, buf
, 1);
532 ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd
);
538 struct sessiond_rotation_timer
*timer_data
;
541 * Take the queue lock only to pop elements from the list.
543 pthread_mutex_lock(&queue
->lock
);
544 if (cds_list_empty(&queue
->list
)) {
545 pthread_mutex_unlock(&queue
->lock
);
548 timer_data
= cds_list_first_entry(&queue
->list
,
549 struct sessiond_rotation_timer
, head
);
550 cds_list_del(&timer_data
->head
);
551 pthread_mutex_unlock(&queue
->lock
);
554 * session lock to lookup the session ID.
557 session
= session_find_by_id(timer_data
->session_id
);
559 DBG("[rotation-thread] Session %" PRIu64
" not found",
560 timer_data
->session_id
);
562 * This is a non-fatal error, and we cannot report it to the
563 * user (timer), so just print the error and continue the
566 session_unlock_list();
572 * Take the session lock and release the session_list lock.
574 session_lock(session
);
575 session_unlock_list();
577 if (timer_data
->signal
== LTTNG_SESSIOND_SIG_ROTATE_PENDING
) {
578 ret
= rotate_pending_relay_timer(session
);
579 } else if (timer_data
->signal
== LTTNG_SESSIOND_SIG_ROTATE_TIMER
) {
580 ret
= rotate_timer(session
);
582 ERR("Unknown signal in rotate timer %d", timer_data
->signal
);
585 session_unlock(session
);
588 ERR("Error processing timer");
599 int handle_condition(
600 const struct lttng_condition
*condition
,
601 const struct lttng_evaluation
*evaluation
,
602 struct notification_thread_handle
*notification_thread_handle
)
605 const char *condition_session_name
= NULL
;
606 enum lttng_condition_type condition_type
;
607 enum lttng_condition_status condition_status
;
608 enum lttng_evaluation_status evaluation_status
;
610 struct ltt_session
*session
;
612 condition_type
= lttng_condition_get_type(condition
);
614 if (condition_type
!= LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
) {
616 ERR("[rotation-thread] Condition type and session usage type are not the same");
620 /* Fetch info to test */
621 condition_status
= lttng_condition_session_consumed_size_get_session_name(
622 condition
, &condition_session_name
);
623 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
624 ERR("[rotation-thread] Session name could not be fetched");
628 evaluation_status
= lttng_evaluation_session_consumed_size_get_consumed_size(evaluation
,
630 if (evaluation_status
!= LTTNG_EVALUATION_STATUS_OK
) {
631 ERR("[rotation-thread] Failed to get evaluation");
637 session
= session_find_by_name(condition_session_name
);
640 session_unlock_list();
641 ERR("[rotation-thread] Session \"%s\" not found",
642 condition_session_name
);
645 session_lock(session
);
646 session_unlock_list();
648 ret
= unsubscribe_session_consumed_size_rotation(session
,
649 notification_thread_handle
);
654 ret
= cmd_rotate_session(session
, NULL
);
655 if (ret
== -LTTNG_ERR_ROTATION_PENDING
) {
656 DBG("Rotate already pending, subscribe to the next threshold value");
658 } else if (ret
!= LTTNG_OK
) {
659 ERR("[rotation-thread] Failed to rotate on size notification with error: %s",
660 lttng_strerror(ret
));
664 ret
= subscribe_session_consumed_size_rotation(session
,
665 consumed
+ session
->rotate_size
,
666 notification_thread_handle
);
668 ERR("[rotation-thread] Failed to subscribe to session consumed size condition");
674 session_unlock(session
);
680 int handle_notification_channel(int fd
, uint32_t revents
,
681 struct rotation_thread_handle
*handle
,
682 struct rotation_thread_state
*state
)
685 struct lttng_notification
*notification
;
686 enum lttng_notification_channel_status status
;
687 const struct lttng_evaluation
*notification_evaluation
;
688 const struct lttng_condition
*notification_condition
;
690 /* Receive the next notification. */
691 status
= lttng_notification_channel_get_next_notification(
692 rotate_notification_channel
,
696 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
:
698 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
:
699 /* Not an error, we will wait for the next one */
702 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
:
703 ERR("Notification channel was closed");
707 /* Unhandled conditions / errors. */
708 ERR("Unknown notification channel status");
713 notification_condition
= lttng_notification_get_condition(notification
);
714 notification_evaluation
= lttng_notification_get_evaluation(notification
);
716 ret
= handle_condition(notification_condition
, notification_evaluation
,
717 handle
->notification_thread_handle
);
720 lttng_notification_destroy(notification
);
729 void *thread_rotation(void *data
)
732 struct rotation_thread_handle
*handle
= data
;
733 struct rotation_thread_state state
;
735 DBG("[rotation-thread] Started rotation thread");
738 ERR("[rotation-thread] Invalid thread context provided");
742 rcu_register_thread();
745 health_register(health_sessiond
, HEALTH_SESSIOND_TYPE_ROTATION
);
746 health_code_update();
748 ret
= init_thread_state(handle
, &state
);
753 /* Ready to handle client connections. */
754 sessiond_notify_ready();
760 DBG("[rotation-thread] Entering poll wait");
761 ret
= lttng_poll_wait(&state
.events
, -1);
762 DBG("[rotation-thread] Poll wait returned (%i)", ret
);
766 * Restart interrupted system call.
768 if (errno
== EINTR
) {
771 ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret
);
776 for (i
= 0; i
< fd_count
; i
++) {
777 int fd
= LTTNG_POLL_GETFD(&state
.events
, i
);
778 uint32_t revents
= LTTNG_POLL_GETEV(&state
.events
, i
);
780 DBG("[rotation-thread] Handling fd (%i) activity (%u)",
783 if (fd
== handle
->thread_quit_pipe
) {
784 DBG("[rotation-thread] Quit pipe activity");
786 } else if (fd
== lttng_pipe_get_readfd(handle
->rotation_timer_queue
->event_pipe
)) {
787 ret
= handle_rotate_timer_pipe(revents
,
788 handle
, &state
, handle
->rotation_timer_queue
);
790 ERR("[rotation-thread] Failed to handle rotation timer pipe event");
793 } else if (fd
== handle
->ust32_consumer
||
794 fd
== handle
->ust64_consumer
||
795 fd
== handle
->kernel_consumer
) {
796 ret
= handle_channel_rotation_pipe(fd
,
797 revents
, handle
, &state
);
799 ERR("[rotation-thread] Handle channel rotation pipe");
802 } else if (fd
== rotate_notification_channel
->socket
) {
803 ret
= handle_notification_channel(fd
, revents
,
806 ERR("[rotation-thread] Error occured while handling activity on notification channel socket");
814 DBG("[rotation-thread] Exit");
815 fini_thread_state(&state
);
816 health_unregister(health_sessiond
);
817 rcu_thread_offline();
818 rcu_unregister_thread();