2 * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 #include <lttng/trigger/trigger.h>
20 #include <common/error.h>
21 #include <common/config/session-config.h>
22 #include <common/defaults.h>
23 #include <common/utils.h>
24 #include <common/futex.h>
25 #include <common/align.h>
26 #include <common/time.h>
27 #include <common/hashtable/utils.h>
28 #include <sys/eventfd.h>
34 #include <common/kernel-ctl/kernel-ctl.h>
35 #include <lttng/notification/channel-internal.h>
36 #include <lttng/rotate-internal.h>
38 #include "rotation-thread.h"
39 #include "lttng-sessiond.h"
40 #include "health-sessiond.h"
44 #include "sessiond-timer.h"
47 #include <urcu/list.h>
48 #include <urcu/rculfhash.h>
51 * Store a struct rotation_channel_info for each channel that is currently
52 * being rotated by the consumer.
54 struct cds_lfht
*channel_pending_rotate_ht
;
56 struct rotation_thread_state
{
57 struct lttng_poll_event events
;
61 void channel_rotation_info_destroy(struct rotation_channel_info
*channel_info
)
68 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
70 struct rotation_channel_key
*channel_key
= (struct rotation_channel_key
*) key
;
71 struct rotation_channel_info
*channel_info
;
73 channel_info
= caa_container_of(node
, struct rotation_channel_info
,
74 rotate_channels_ht_node
);
76 return !!((channel_key
->key
== channel_info
->channel_key
.key
) &&
77 (channel_key
->domain
== channel_info
->channel_key
.domain
));
81 struct rotation_channel_info
*lookup_channel_pending(uint64_t key
,
82 enum lttng_domain_type domain
)
84 struct cds_lfht_iter iter
;
85 struct cds_lfht_node
*node
;
86 struct rotation_channel_info
*channel_info
= NULL
;
87 struct rotation_channel_key channel_key
= { .key
= key
,
90 cds_lfht_lookup(channel_pending_rotate_ht
,
91 hash_channel_key(&channel_key
),
94 node
= cds_lfht_iter_get_node(&iter
);
99 channel_info
= caa_container_of(node
, struct rotation_channel_info
,
100 rotate_channels_ht_node
);
101 cds_lfht_del(channel_pending_rotate_ht
, node
);
107 * Destroy the thread data previously created by the init function.
109 void rotation_thread_handle_destroy(
110 struct rotation_thread_handle
*handle
)
118 if (handle
->ust32_consumer
>= 0) {
119 ret
= close(handle
->ust32_consumer
);
121 PERROR("close 32-bit consumer channel rotation pipe");
124 if (handle
->ust64_consumer
>= 0) {
125 ret
= close(handle
->ust64_consumer
);
127 PERROR("close 64-bit consumer channel rotation pipe");
130 if (handle
->kernel_consumer
>= 0) {
131 ret
= close(handle
->kernel_consumer
);
133 PERROR("close kernel consumer channel rotation pipe");
141 struct rotation_thread_handle
*rotation_thread_handle_create(
142 struct lttng_pipe
*ust32_channel_rotate_pipe
,
143 struct lttng_pipe
*ust64_channel_rotate_pipe
,
144 struct lttng_pipe
*kernel_channel_rotate_pipe
,
145 int thread_quit_pipe
,
146 struct rotation_thread_timer_queue
*rotation_timer_queue
)
148 struct rotation_thread_handle
*handle
;
150 handle
= zmalloc(sizeof(*handle
));
155 if (ust32_channel_rotate_pipe
) {
156 handle
->ust32_consumer
=
157 lttng_pipe_release_readfd(
158 ust32_channel_rotate_pipe
);
159 if (handle
->ust32_consumer
< 0) {
163 handle
->ust32_consumer
= -1;
165 if (ust64_channel_rotate_pipe
) {
166 handle
->ust64_consumer
=
167 lttng_pipe_release_readfd(
168 ust64_channel_rotate_pipe
);
169 if (handle
->ust64_consumer
< 0) {
173 handle
->ust64_consumer
= -1;
175 if (kernel_channel_rotate_pipe
) {
176 handle
->kernel_consumer
=
177 lttng_pipe_release_readfd(
178 kernel_channel_rotate_pipe
);
179 if (handle
->kernel_consumer
< 0) {
183 handle
->kernel_consumer
= -1;
185 handle
->thread_quit_pipe
= thread_quit_pipe
;
186 handle
->rotation_timer_queue
= rotation_timer_queue
;
191 rotation_thread_handle_destroy(handle
);
196 int init_poll_set(struct lttng_poll_event
*poll_set
,
197 struct rotation_thread_handle
*handle
)
202 * Create pollset with size 5:
203 * - sessiond quit pipe
204 * - sessiond timer pipe,
205 * - consumerd (32-bit user space) channel rotate pipe,
206 * - consumerd (64-bit user space) channel rotate pipe,
207 * - consumerd (kernel) channel rotate pipe,
209 ret
= lttng_poll_create(poll_set
, 5, LTTNG_CLOEXEC
);
214 ret
= lttng_poll_add(poll_set
, handle
->thread_quit_pipe
,
217 ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
220 ret
= lttng_poll_add(poll_set
,
221 lttng_pipe_get_readfd(handle
->rotation_timer_queue
->event_pipe
),
224 ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
227 ret
= lttng_poll_add(poll_set
, handle
->ust32_consumer
,
230 ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
233 ret
= lttng_poll_add(poll_set
, handle
->ust64_consumer
,
236 ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
239 if (handle
->kernel_consumer
>= 0) {
240 ret
= lttng_poll_add(poll_set
, handle
->kernel_consumer
,
243 ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
251 lttng_poll_clean(poll_set
);
256 void fini_thread_state(struct rotation_thread_state
*state
)
258 lttng_poll_clean(&state
->events
);
259 cds_lfht_destroy(channel_pending_rotate_ht
, NULL
);
263 int init_thread_state(struct rotation_thread_handle
*handle
,
264 struct rotation_thread_state
*state
)
268 memset(state
, 0, sizeof(*state
));
269 lttng_poll_init(&state
->events
);
271 ret
= init_poll_set(&state
->events
, handle
);
273 ERR("[rotation-thread] Failed to initialize rotation thread poll set");
277 channel_pending_rotate_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
278 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
279 if (!channel_pending_rotate_ht
) {
280 ERR("[rotation-thread] Failed to create channel pending rotation hash table");
290 int handle_channel_rotation_pipe(int fd
, uint32_t revents
,
291 struct rotation_thread_handle
*handle
,
292 struct rotation_thread_state
*state
)
295 enum lttng_domain_type domain
;
296 struct rotation_channel_info
*channel_info
;
297 struct ltt_session
*session
= NULL
;
300 if (fd
== handle
->ust32_consumer
||
301 fd
== handle
->ust64_consumer
) {
302 domain
= LTTNG_DOMAIN_UST
;
303 } else if (fd
== handle
->kernel_consumer
) {
304 domain
= LTTNG_DOMAIN_KERNEL
;
306 ERR("[rotation-thread] Unknown channel rotation pipe fd %d",
311 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
312 ret
= lttng_poll_del(&state
->events
, fd
);
314 ERR("[rotation-thread] Failed to remove consumer "
315 "rotation pipe from poll set");
321 ret
= read(fd
, &key
, sizeof(key
));
322 } while (ret
== -1 && errno
== EINTR
);
323 if (ret
!= sizeof(key
)) {
324 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
330 DBG("[rotation-thread] Received notification for chan %" PRIu64
331 ", domain %d\n", key
, domain
);
333 channel_info
= lookup_channel_pending(key
, domain
);
335 ERR("[rotation-thread] Failed to find channel_info (key = %"
342 session
= session_find_by_id(channel_info
->session_id
);
345 * The session may have been destroyed before we had a chance to
346 * perform this action, return gracefully.
348 DBG("[rotation-thread] Session %" PRIu64
" not found",
349 channel_info
->session_id
);
351 goto end_unlock_session_list
;
354 session_lock(session
);
355 if (--session
->nr_chan_rotate_pending
== 0) {
356 time_t now
= time(NULL
);
358 if (now
== (time_t) -1) {
359 session
->rotation_state
= LTTNG_ROTATION_STATE_ERROR
;
361 goto end_unlock_session
;
364 ret
= rename_complete_chunk(session
, now
);
366 ERR("Failed to rename completed rotation chunk");
367 goto end_unlock_session
;
369 session
->rotate_pending
= false;
370 session
->rotation_state
= LTTNG_ROTATION_STATE_COMPLETED
;
371 session
->last_chunk_start_ts
= session
->current_chunk_start_ts
;
372 if (session
->rotate_pending_relay
) {
373 ret
= sessiond_timer_rotate_pending_start(
375 DEFAULT_ROTATE_PENDING_RELAY_TIMER
);
377 ERR("Failed to enable rotate pending timer");
379 goto end_unlock_session
;
382 DBG("Rotation completed for session %s", session
->name
);
388 channel_rotation_info_destroy(channel_info
);
389 session_unlock(session
);
390 end_unlock_session_list
:
391 session_unlock_list();
398 * Process the rotate_pending check, called with session lock held.
401 int rotate_pending_relay_timer(struct ltt_session
*session
)
405 DBG("[rotation-thread] Check rotate pending on session %" PRIu64
,
407 ret
= relay_rotate_pending(session
, session
->rotate_count
- 1);
409 ERR("[rotation-thread] Check relay rotate pending");
413 DBG("[rotation-thread] Rotation completed on the relay for "
414 "session %" PRIu64
, session
->id
);
416 * Now we can clear the pending flag in the session. New
417 * rotations can start now.
419 session
->rotate_pending_relay
= false;
420 } else if (ret
== 1) {
421 DBG("[rotation-thread] Rotation still pending on the relay for "
422 "session %" PRIu64
, session
->id
);
423 ret
= sessiond_timer_rotate_pending_start(session
,
424 DEFAULT_ROTATE_PENDING_RELAY_TIMER
);
426 ERR("Re-enabling rotate pending timer");
439 int handle_rotate_timer_pipe(uint32_t revents
,
440 struct rotation_thread_handle
*handle
,
441 struct rotation_thread_state
*state
,
442 struct rotation_thread_timer_queue
*queue
)
445 int fd
= lttng_pipe_get_readfd(queue
->event_pipe
);
446 struct ltt_session
*session
;
449 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
450 ret
= lttng_poll_del(&state
->events
, fd
);
452 ERR("[rotation-thread] Failed to remove consumer "
453 "rotate pending pipe from poll set");
458 ret
= lttng_read(fd
, buf
, 1);
460 ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd
);
466 struct sessiond_rotation_timer
*timer_data
;
469 * Take the queue lock only to pop elements from the list.
471 pthread_mutex_lock(&queue
->lock
);
472 if (cds_list_empty(&queue
->list
)) {
473 pthread_mutex_unlock(&queue
->lock
);
476 timer_data
= cds_list_first_entry(&queue
->list
,
477 struct sessiond_rotation_timer
, head
);
478 cds_list_del(&timer_data
->head
);
479 pthread_mutex_unlock(&queue
->lock
);
482 * session lock to lookup the session ID.
485 session
= session_find_by_id(timer_data
->session_id
);
487 DBG("[rotation-thread] Session %" PRIu64
" not found",
488 timer_data
->session_id
);
490 * This is a non-fatal error, and we cannot report it to the
491 * user (timer), so just print the error and continue the
494 session_unlock_list();
500 * Take the session lock and release the session_list lock.
502 session_lock(session
);
503 session_unlock_list();
505 if (timer_data
->signal
== LTTNG_SESSIOND_SIG_ROTATE_PENDING
) {
506 ret
= rotate_pending_relay_timer(session
);
508 ERR("Unknown signal in rotate timer %d", timer_data
->signal
);
511 session_unlock(session
);
514 ERR("Error processing timer");
525 void *thread_rotation(void *data
)
528 struct rotation_thread_handle
*handle
= data
;
529 struct rotation_thread_state state
;
531 DBG("[rotation-thread] Started rotation thread");
534 ERR("[rotation-thread] Invalid thread context provided");
538 rcu_register_thread();
541 health_register(health_sessiond
, HEALTH_SESSIOND_TYPE_ROTATION
);
542 health_code_update();
544 ret
= init_thread_state(handle
, &state
);
549 /* Ready to handle client connections. */
550 sessiond_notify_ready();
556 DBG("[rotation-thread] Entering poll wait");
557 ret
= lttng_poll_wait(&state
.events
, -1);
558 DBG("[rotation-thread] Poll wait returned (%i)", ret
);
562 * Restart interrupted system call.
564 if (errno
== EINTR
) {
567 ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret
);
572 for (i
= 0; i
< fd_count
; i
++) {
573 int fd
= LTTNG_POLL_GETFD(&state
.events
, i
);
574 uint32_t revents
= LTTNG_POLL_GETEV(&state
.events
, i
);
576 DBG("[rotation-thread] Handling fd (%i) activity (%u)",
579 if (fd
== handle
->thread_quit_pipe
) {
580 DBG("[rotation-thread] Quit pipe activity");
582 } else if (fd
== lttng_pipe_get_readfd(handle
->rotation_timer_queue
->event_pipe
)) {
583 ret
= handle_rotate_timer_pipe(revents
,
584 handle
, &state
, handle
->rotation_timer_queue
);
586 ERR("[rotation-thread] Failed to handle rotation timer pipe event");
589 } else if (fd
== handle
->ust32_consumer
||
590 fd
== handle
->ust64_consumer
||
591 fd
== handle
->kernel_consumer
) {
592 ret
= handle_channel_rotation_pipe(fd
,
593 revents
, handle
, &state
);
595 ERR("[rotation-thread] Handle channel rotation pipe");
603 DBG("[rotation-thread] Exit");
604 fini_thread_state(&state
);
605 health_unregister(health_sessiond
);
606 rcu_thread_offline();
607 rcu_unregister_thread();