2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
33 #include <bin/lttng-consumerd/health-consumerd.h>
34 #include <common/common.h>
35 #include <common/utils.h>
36 #include <common/time.h>
37 #include <common/compat/poll.h>
38 #include <common/compat/endian.h>
39 #include <common/index/index.h>
40 #include <common/kernel-ctl/kernel-ctl.h>
41 #include <common/sessiond-comm/relayd.h>
42 #include <common/sessiond-comm/sessiond-comm.h>
43 #include <common/kernel-consumer/kernel-consumer.h>
44 #include <common/relayd/relayd.h>
45 #include <common/ust-consumer/ust-consumer.h>
46 #include <common/consumer/consumer-timer.h>
47 #include <common/consumer/consumer.h>
48 #include <common/consumer/consumer-stream.h>
49 #include <common/consumer/consumer-testpoint.h>
50 #include <common/align.h>
51 #include <common/consumer/consumer-metadata-cache.h>
52 #include <common/trace-chunk.h>
53 #include <common/trace-chunk-registry.h>
54 #include <common/string-utils/format.h>
55 #include <common/dynamic-array.h>
57 struct lttng_consumer_global_data consumer_data
= {
60 .type
= LTTNG_CONSUMER_UNKNOWN
,
63 enum consumer_channel_action
{
66 CONSUMER_CHANNEL_QUIT
,
69 struct consumer_channel_msg
{
70 enum consumer_channel_action action
;
71 struct lttng_consumer_channel
*chan
; /* add */
72 uint64_t key
; /* del */
75 /* Flag used to temporarily pause data consumption from testpoints. */
76 int data_consumption_paused
;
79 * Flag to inform the polling thread to quit when all fd hung up. Updated by
80 * the consumer_thread_receive_fds when it notices that all fds has hung up.
81 * Also updated by the signal handler (consumer_should_exit()). Read by the
87 * Global hash table containing respectively metadata and data streams. The
88 * stream element in this ht should only be updated by the metadata poll thread
89 * for the metadata and the data poll thread for the data.
91 static struct lttng_ht
*metadata_ht
;
92 static struct lttng_ht
*data_ht
;
95 * Notify a thread lttng pipe to poll back again. This usually means that some
96 * global state has changed so we just send back the thread in a poll wait
99 static void notify_thread_lttng_pipe(struct lttng_pipe
*pipe
)
101 struct lttng_consumer_stream
*null_stream
= NULL
;
105 (void) lttng_pipe_write(pipe
, &null_stream
, sizeof(null_stream
));
108 static void notify_health_quit_pipe(int *pipe
)
112 ret
= lttng_write(pipe
[1], "4", 1);
114 PERROR("write consumer health quit");
118 static void notify_channel_pipe(struct lttng_consumer_local_data
*ctx
,
119 struct lttng_consumer_channel
*chan
,
121 enum consumer_channel_action action
)
123 struct consumer_channel_msg msg
;
126 memset(&msg
, 0, sizeof(msg
));
131 ret
= lttng_write(ctx
->consumer_channel_pipe
[1], &msg
, sizeof(msg
));
132 if (ret
< sizeof(msg
)) {
133 PERROR("notify_channel_pipe write error");
137 void notify_thread_del_channel(struct lttng_consumer_local_data
*ctx
,
140 notify_channel_pipe(ctx
, NULL
, key
, CONSUMER_CHANNEL_DEL
);
143 static int read_channel_pipe(struct lttng_consumer_local_data
*ctx
,
144 struct lttng_consumer_channel
**chan
,
146 enum consumer_channel_action
*action
)
148 struct consumer_channel_msg msg
;
151 ret
= lttng_read(ctx
->consumer_channel_pipe
[0], &msg
, sizeof(msg
));
152 if (ret
< sizeof(msg
)) {
156 *action
= msg
.action
;
164 * Cleanup the stream list of a channel. Those streams are not yet globally
167 static void clean_channel_stream_list(struct lttng_consumer_channel
*channel
)
169 struct lttng_consumer_stream
*stream
, *stmp
;
173 /* Delete streams that might have been left in the stream list. */
174 cds_list_for_each_entry_safe(stream
, stmp
, &channel
->streams
.head
,
176 cds_list_del(&stream
->send_node
);
178 * Once a stream is added to this list, the buffers were created so we
179 * have a guarantee that this call will succeed. Setting the monitor
180 * mode to 0 so we don't lock nor try to delete the stream from the
184 consumer_stream_destroy(stream
, NULL
);
189 * Find a stream. The consumer_data.lock must be locked during this
192 static struct lttng_consumer_stream
*find_stream(uint64_t key
,
195 struct lttng_ht_iter iter
;
196 struct lttng_ht_node_u64
*node
;
197 struct lttng_consumer_stream
*stream
= NULL
;
201 /* -1ULL keys are lookup failures */
202 if (key
== (uint64_t) -1ULL) {
208 lttng_ht_lookup(ht
, &key
, &iter
);
209 node
= lttng_ht_iter_get_node_u64(&iter
);
211 stream
= caa_container_of(node
, struct lttng_consumer_stream
, node
);
219 static void steal_stream_key(uint64_t key
, struct lttng_ht
*ht
)
221 struct lttng_consumer_stream
*stream
;
224 stream
= find_stream(key
, ht
);
226 stream
->key
= (uint64_t) -1ULL;
228 * We don't want the lookup to match, but we still need
229 * to iterate on this stream when iterating over the hash table. Just
230 * change the node key.
232 stream
->node
.key
= (uint64_t) -1ULL;
238 * Return a channel object for the given key.
240 * RCU read side lock MUST be acquired before calling this function and
241 * protects the channel ptr.
243 struct lttng_consumer_channel
*consumer_find_channel(uint64_t key
)
245 struct lttng_ht_iter iter
;
246 struct lttng_ht_node_u64
*node
;
247 struct lttng_consumer_channel
*channel
= NULL
;
249 /* -1ULL keys are lookup failures */
250 if (key
== (uint64_t) -1ULL) {
254 lttng_ht_lookup(consumer_data
.channel_ht
, &key
, &iter
);
255 node
= lttng_ht_iter_get_node_u64(&iter
);
257 channel
= caa_container_of(node
, struct lttng_consumer_channel
, node
);
264 * There is a possibility that the consumer does not have enough time between
265 * the close of the channel on the session daemon and the cleanup in here thus
266 * once we have a channel add with an existing key, we know for sure that this
267 * channel will eventually get cleaned up by all streams being closed.
269 * This function just nullifies the already existing channel key.
271 static void steal_channel_key(uint64_t key
)
273 struct lttng_consumer_channel
*channel
;
276 channel
= consumer_find_channel(key
);
278 channel
->key
= (uint64_t) -1ULL;
280 * We don't want the lookup to match, but we still need to iterate on
281 * this channel when iterating over the hash table. Just change the
284 channel
->node
.key
= (uint64_t) -1ULL;
289 static void free_channel_rcu(struct rcu_head
*head
)
291 struct lttng_ht_node_u64
*node
=
292 caa_container_of(head
, struct lttng_ht_node_u64
, head
);
293 struct lttng_consumer_channel
*channel
=
294 caa_container_of(node
, struct lttng_consumer_channel
, node
);
296 switch (consumer_data
.type
) {
297 case LTTNG_CONSUMER_KERNEL
:
299 case LTTNG_CONSUMER32_UST
:
300 case LTTNG_CONSUMER64_UST
:
301 lttng_ustconsumer_free_channel(channel
);
304 ERR("Unknown consumer_data type");
311 * RCU protected relayd socket pair free.
313 static void free_relayd_rcu(struct rcu_head
*head
)
315 struct lttng_ht_node_u64
*node
=
316 caa_container_of(head
, struct lttng_ht_node_u64
, head
);
317 struct consumer_relayd_sock_pair
*relayd
=
318 caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
321 * Close all sockets. This is done in the call RCU since we don't want the
322 * socket fds to be reassigned thus potentially creating bad state of the
325 * We do not have to lock the control socket mutex here since at this stage
326 * there is no one referencing to this relayd object.
328 (void) relayd_close(&relayd
->control_sock
);
329 (void) relayd_close(&relayd
->data_sock
);
331 pthread_mutex_destroy(&relayd
->ctrl_sock_mutex
);
336 * Destroy and free relayd socket pair object.
338 void consumer_destroy_relayd(struct consumer_relayd_sock_pair
*relayd
)
341 struct lttng_ht_iter iter
;
343 if (relayd
== NULL
) {
347 DBG("Consumer destroy and close relayd socket pair");
349 iter
.iter
.node
= &relayd
->node
.node
;
350 ret
= lttng_ht_del(consumer_data
.relayd_ht
, &iter
);
352 /* We assume the relayd is being or is destroyed */
356 /* RCU free() call */
357 call_rcu(&relayd
->node
.head
, free_relayd_rcu
);
361 * Remove a channel from the global list protected by a mutex. This function is
362 * also responsible for freeing its data structures.
364 void consumer_del_channel(struct lttng_consumer_channel
*channel
)
366 struct lttng_ht_iter iter
;
368 DBG("Consumer delete channel key %" PRIu64
, channel
->key
);
370 pthread_mutex_lock(&consumer_data
.lock
);
371 pthread_mutex_lock(&channel
->lock
);
373 /* Destroy streams that might have been left in the stream list. */
374 clean_channel_stream_list(channel
);
376 if (channel
->live_timer_enabled
== 1) {
377 consumer_timer_live_stop(channel
);
379 if (channel
->monitor_timer_enabled
== 1) {
380 consumer_timer_monitor_stop(channel
);
383 switch (consumer_data
.type
) {
384 case LTTNG_CONSUMER_KERNEL
:
386 case LTTNG_CONSUMER32_UST
:
387 case LTTNG_CONSUMER64_UST
:
388 lttng_ustconsumer_del_channel(channel
);
391 ERR("Unknown consumer_data type");
396 lttng_trace_chunk_put(channel
->trace_chunk
);
397 channel
->trace_chunk
= NULL
;
399 if (channel
->is_published
) {
403 iter
.iter
.node
= &channel
->node
.node
;
404 ret
= lttng_ht_del(consumer_data
.channel_ht
, &iter
);
407 iter
.iter
.node
= &channel
->channels_by_session_id_ht_node
.node
;
408 ret
= lttng_ht_del(consumer_data
.channels_by_session_id_ht
,
414 call_rcu(&channel
->node
.head
, free_channel_rcu
);
416 pthread_mutex_unlock(&channel
->lock
);
417 pthread_mutex_unlock(&consumer_data
.lock
);
421 * Iterate over the relayd hash table and destroy each element. Finally,
422 * destroy the whole hash table.
424 static void cleanup_relayd_ht(void)
426 struct lttng_ht_iter iter
;
427 struct consumer_relayd_sock_pair
*relayd
;
431 cds_lfht_for_each_entry(consumer_data
.relayd_ht
->ht
, &iter
.iter
, relayd
,
433 consumer_destroy_relayd(relayd
);
438 lttng_ht_destroy(consumer_data
.relayd_ht
);
442 * Update the end point status of all streams having the given network sequence
443 * index (relayd index).
445 * It's atomically set without having the stream mutex locked which is fine
446 * because we handle the write/read race with a pipe wakeup for each thread.
448 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx
,
449 enum consumer_endpoint_status status
)
451 struct lttng_ht_iter iter
;
452 struct lttng_consumer_stream
*stream
;
454 DBG("Consumer set delete flag on stream by idx %" PRIu64
, net_seq_idx
);
458 /* Let's begin with metadata */
459 cds_lfht_for_each_entry(metadata_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
460 if (stream
->net_seq_idx
== net_seq_idx
) {
461 uatomic_set(&stream
->endpoint_status
, status
);
462 DBG("Delete flag set to metadata stream %d", stream
->wait_fd
);
466 /* Follow up by the data streams */
467 cds_lfht_for_each_entry(data_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
468 if (stream
->net_seq_idx
== net_seq_idx
) {
469 uatomic_set(&stream
->endpoint_status
, status
);
470 DBG("Delete flag set to data stream %d", stream
->wait_fd
);
477 * Cleanup a relayd object by flagging every associated streams for deletion,
478 * destroying the object meaning removing it from the relayd hash table,
479 * closing the sockets and freeing the memory in a RCU call.
481 * If a local data context is available, notify the threads that the streams'
482 * state have changed.
484 void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair
*relayd
)
490 DBG("Cleaning up relayd object ID %"PRIu64
, relayd
->net_seq_idx
);
492 /* Save the net sequence index before destroying the object */
493 netidx
= relayd
->net_seq_idx
;
496 * Delete the relayd from the relayd hash table, close the sockets and free
497 * the object in a RCU call.
499 consumer_destroy_relayd(relayd
);
501 /* Set inactive endpoint to all streams */
502 update_endpoint_status_by_netidx(netidx
, CONSUMER_ENDPOINT_INACTIVE
);
505 * With a local data context, notify the threads that the streams' state
506 * have changed. The write() action on the pipe acts as an "implicit"
507 * memory barrier ordering the updates of the end point status from the
508 * read of this status which happens AFTER receiving this notify.
510 notify_thread_lttng_pipe(relayd
->ctx
->consumer_data_pipe
);
511 notify_thread_lttng_pipe(relayd
->ctx
->consumer_metadata_pipe
);
515 * Flag a relayd socket pair for destruction. Destroy it if the refcount
518 * RCU read side lock MUST be aquired before calling this function.
520 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair
*relayd
)
524 /* Set destroy flag for this object */
525 uatomic_set(&relayd
->destroy_flag
, 1);
527 /* Destroy the relayd if refcount is 0 */
528 if (uatomic_read(&relayd
->refcount
) == 0) {
529 consumer_destroy_relayd(relayd
);
534 * Completly destroy stream from every visiable data structure and the given
537 * One this call returns, the stream object is not longer usable nor visible.
539 void consumer_del_stream(struct lttng_consumer_stream
*stream
,
542 consumer_stream_destroy(stream
, ht
);
546 * XXX naming of del vs destroy is all mixed up.
548 void consumer_del_stream_for_data(struct lttng_consumer_stream
*stream
)
550 consumer_stream_destroy(stream
, data_ht
);
553 void consumer_del_stream_for_metadata(struct lttng_consumer_stream
*stream
)
555 consumer_stream_destroy(stream
, metadata_ht
);
558 void consumer_stream_update_channel_attributes(
559 struct lttng_consumer_stream
*stream
,
560 struct lttng_consumer_channel
*channel
)
562 stream
->channel_read_only_attributes
.tracefile_size
=
563 channel
->tracefile_size
;
566 struct lttng_consumer_stream
*consumer_allocate_stream(uint64_t channel_key
,
568 const char *channel_name
,
571 struct lttng_trace_chunk
*trace_chunk
,
574 enum consumer_channel_type type
,
575 unsigned int monitor
)
578 struct lttng_consumer_stream
*stream
;
580 stream
= zmalloc(sizeof(*stream
));
581 if (stream
== NULL
) {
582 PERROR("malloc struct lttng_consumer_stream");
587 if (trace_chunk
&& !lttng_trace_chunk_get(trace_chunk
)) {
588 ERR("Failed to acquire trace chunk reference during the creation of a stream");
594 stream
->key
= stream_key
;
595 stream
->trace_chunk
= trace_chunk
;
597 stream
->out_fd_offset
= 0;
598 stream
->output_written
= 0;
599 stream
->net_seq_idx
= relayd_id
;
600 stream
->session_id
= session_id
;
601 stream
->monitor
= monitor
;
602 stream
->endpoint_status
= CONSUMER_ENDPOINT_ACTIVE
;
603 stream
->index_file
= NULL
;
604 stream
->last_sequence_number
= -1ULL;
605 pthread_mutex_init(&stream
->lock
, NULL
);
606 pthread_mutex_init(&stream
->metadata_timer_lock
, NULL
);
608 /* If channel is the metadata, flag this stream as metadata. */
609 if (type
== CONSUMER_CHANNEL_TYPE_METADATA
) {
610 stream
->metadata_flag
= 1;
611 /* Metadata is flat out. */
612 strncpy(stream
->name
, DEFAULT_METADATA_NAME
, sizeof(stream
->name
));
613 /* Live rendez-vous point. */
614 pthread_cond_init(&stream
->metadata_rdv
, NULL
);
615 pthread_mutex_init(&stream
->metadata_rdv_lock
, NULL
);
617 /* Format stream name to <channel_name>_<cpu_number> */
618 ret
= snprintf(stream
->name
, sizeof(stream
->name
), "%s_%d",
621 PERROR("snprintf stream name");
626 /* Key is always the wait_fd for streams. */
627 lttng_ht_node_init_u64(&stream
->node
, stream
->key
);
629 /* Init node per channel id key */
630 lttng_ht_node_init_u64(&stream
->node_channel_id
, channel_key
);
632 /* Init session id node with the stream session id */
633 lttng_ht_node_init_u64(&stream
->node_session_id
, stream
->session_id
);
635 DBG3("Allocated stream %s (key %" PRIu64
", chan_key %" PRIu64
636 " relayd_id %" PRIu64
", session_id %" PRIu64
,
637 stream
->name
, stream
->key
, channel_key
,
638 stream
->net_seq_idx
, stream
->session_id
);
645 lttng_trace_chunk_put(stream
->trace_chunk
);
655 * Add a stream to the global list protected by a mutex.
657 void consumer_add_data_stream(struct lttng_consumer_stream
*stream
)
659 struct lttng_ht
*ht
= data_ht
;
664 DBG3("Adding consumer stream %" PRIu64
, stream
->key
);
666 pthread_mutex_lock(&consumer_data
.lock
);
667 pthread_mutex_lock(&stream
->chan
->lock
);
668 pthread_mutex_lock(&stream
->chan
->timer_lock
);
669 pthread_mutex_lock(&stream
->lock
);
672 /* Steal stream identifier to avoid having streams with the same key */
673 steal_stream_key(stream
->key
, ht
);
675 lttng_ht_add_unique_u64(ht
, &stream
->node
);
677 lttng_ht_add_u64(consumer_data
.stream_per_chan_id_ht
,
678 &stream
->node_channel_id
);
681 * Add stream to the stream_list_ht of the consumer data. No need to steal
682 * the key since the HT does not use it and we allow to add redundant keys
685 lttng_ht_add_u64(consumer_data
.stream_list_ht
, &stream
->node_session_id
);
688 * When nb_init_stream_left reaches 0, we don't need to trigger any action
689 * in terms of destroying the associated channel, because the action that
690 * causes the count to become 0 also causes a stream to be added. The
691 * channel deletion will thus be triggered by the following removal of this
694 if (uatomic_read(&stream
->chan
->nb_init_stream_left
) > 0) {
695 /* Increment refcount before decrementing nb_init_stream_left */
697 uatomic_dec(&stream
->chan
->nb_init_stream_left
);
700 /* Update consumer data once the node is inserted. */
701 consumer_data
.stream_count
++;
702 consumer_data
.need_update
= 1;
705 pthread_mutex_unlock(&stream
->lock
);
706 pthread_mutex_unlock(&stream
->chan
->timer_lock
);
707 pthread_mutex_unlock(&stream
->chan
->lock
);
708 pthread_mutex_unlock(&consumer_data
.lock
);
711 void consumer_del_data_stream(struct lttng_consumer_stream
*stream
)
713 consumer_del_stream(stream
, data_ht
);
717 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
718 * be acquired before calling this.
720 static int add_relayd(struct consumer_relayd_sock_pair
*relayd
)
723 struct lttng_ht_node_u64
*node
;
724 struct lttng_ht_iter iter
;
728 lttng_ht_lookup(consumer_data
.relayd_ht
,
729 &relayd
->net_seq_idx
, &iter
);
730 node
= lttng_ht_iter_get_node_u64(&iter
);
734 lttng_ht_add_unique_u64(consumer_data
.relayd_ht
, &relayd
->node
);
741 * Allocate and return a consumer relayd socket.
743 static struct consumer_relayd_sock_pair
*consumer_allocate_relayd_sock_pair(
744 uint64_t net_seq_idx
)
746 struct consumer_relayd_sock_pair
*obj
= NULL
;
748 /* net sequence index of -1 is a failure */
749 if (net_seq_idx
== (uint64_t) -1ULL) {
753 obj
= zmalloc(sizeof(struct consumer_relayd_sock_pair
));
755 PERROR("zmalloc relayd sock");
759 obj
->net_seq_idx
= net_seq_idx
;
761 obj
->destroy_flag
= 0;
762 obj
->control_sock
.sock
.fd
= -1;
763 obj
->data_sock
.sock
.fd
= -1;
764 lttng_ht_node_init_u64(&obj
->node
, obj
->net_seq_idx
);
765 pthread_mutex_init(&obj
->ctrl_sock_mutex
, NULL
);
772 * Find a relayd socket pair in the global consumer data.
774 * Return the object if found else NULL.
775 * RCU read-side lock must be held across this call and while using the
778 struct consumer_relayd_sock_pair
*consumer_find_relayd(uint64_t key
)
780 struct lttng_ht_iter iter
;
781 struct lttng_ht_node_u64
*node
;
782 struct consumer_relayd_sock_pair
*relayd
= NULL
;
784 /* Negative keys are lookup failures */
785 if (key
== (uint64_t) -1ULL) {
789 lttng_ht_lookup(consumer_data
.relayd_ht
, &key
,
791 node
= lttng_ht_iter_get_node_u64(&iter
);
793 relayd
= caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
801 * Find a relayd and send the stream
803 * Returns 0 on success, < 0 on error
805 int consumer_send_relayd_stream(struct lttng_consumer_stream
*stream
,
809 struct consumer_relayd_sock_pair
*relayd
;
812 assert(stream
->net_seq_idx
!= -1ULL);
815 /* The stream is not metadata. Get relayd reference if exists. */
817 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
818 if (relayd
!= NULL
) {
819 /* Add stream on the relayd */
820 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
821 ret
= relayd_add_stream(&relayd
->control_sock
, stream
->name
,
822 path
, &stream
->relayd_stream_id
,
823 stream
->chan
->tracefile_size
,
824 stream
->chan
->tracefile_count
,
825 stream
->trace_chunk
);
826 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
828 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64
".", relayd
->net_seq_idx
);
829 lttng_consumer_cleanup_relayd(relayd
);
833 uatomic_inc(&relayd
->refcount
);
834 stream
->sent_to_relayd
= 1;
836 ERR("Stream %" PRIu64
" relayd ID %" PRIu64
" unknown. Can't send it.",
837 stream
->key
, stream
->net_seq_idx
);
842 DBG("Stream %s with key %" PRIu64
" sent to relayd id %" PRIu64
,
843 stream
->name
, stream
->key
, stream
->net_seq_idx
);
851 * Find a relayd and send the streams sent message
853 * Returns 0 on success, < 0 on error
855 int consumer_send_relayd_streams_sent(uint64_t net_seq_idx
)
858 struct consumer_relayd_sock_pair
*relayd
;
860 assert(net_seq_idx
!= -1ULL);
862 /* The stream is not metadata. Get relayd reference if exists. */
864 relayd
= consumer_find_relayd(net_seq_idx
);
865 if (relayd
!= NULL
) {
866 /* Add stream on the relayd */
867 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
868 ret
= relayd_streams_sent(&relayd
->control_sock
);
869 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
871 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64
".", relayd
->net_seq_idx
);
872 lttng_consumer_cleanup_relayd(relayd
);
876 ERR("Relayd ID %" PRIu64
" unknown. Can't send streams_sent.",
883 DBG("All streams sent relayd id %" PRIu64
, net_seq_idx
);
891 * Find a relayd and close the stream
893 void close_relayd_stream(struct lttng_consumer_stream
*stream
)
895 struct consumer_relayd_sock_pair
*relayd
;
897 /* The stream is not metadata. Get relayd reference if exists. */
899 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
901 consumer_stream_relayd_close(stream
, relayd
);
907 * Handle stream for relayd transmission if the stream applies for network
908 * streaming where the net sequence index is set.
910 * Return destination file descriptor or negative value on error.
912 static int write_relayd_stream_header(struct lttng_consumer_stream
*stream
,
913 size_t data_size
, unsigned long padding
,
914 struct consumer_relayd_sock_pair
*relayd
)
917 struct lttcomm_relayd_data_hdr data_hdr
;
923 /* Reset data header */
924 memset(&data_hdr
, 0, sizeof(data_hdr
));
926 if (stream
->metadata_flag
) {
927 /* Caller MUST acquire the relayd control socket lock */
928 ret
= relayd_send_metadata(&relayd
->control_sock
, data_size
);
933 /* Metadata are always sent on the control socket. */
934 outfd
= relayd
->control_sock
.sock
.fd
;
936 /* Set header with stream information */
937 data_hdr
.stream_id
= htobe64(stream
->relayd_stream_id
);
938 data_hdr
.data_size
= htobe32(data_size
);
939 data_hdr
.padding_size
= htobe32(padding
);
942 * Note that net_seq_num below is assigned with the *current* value of
943 * next_net_seq_num and only after that the next_net_seq_num will be
944 * increment. This is why when issuing a command on the relayd using
945 * this next value, 1 should always be substracted in order to compare
946 * the last seen sequence number on the relayd side to the last sent.
948 data_hdr
.net_seq_num
= htobe64(stream
->next_net_seq_num
);
949 /* Other fields are zeroed previously */
951 ret
= relayd_send_data_hdr(&relayd
->data_sock
, &data_hdr
,
957 ++stream
->next_net_seq_num
;
959 /* Set to go on data socket */
960 outfd
= relayd
->data_sock
.sock
.fd
;
968 * Trigger a dump of the metadata content. Following/during the succesful
969 * completion of this call, the metadata poll thread will start receiving
970 * metadata packets to consume.
972 * The caller must hold the channel and stream locks.
975 int consumer_metadata_stream_dump(struct lttng_consumer_stream
*stream
)
979 ASSERT_LOCKED(stream
->chan
->lock
);
980 ASSERT_LOCKED(stream
->lock
);
981 assert(stream
->metadata_flag
);
982 assert(stream
->chan
->trace_chunk
);
984 switch (consumer_data
.type
) {
985 case LTTNG_CONSUMER_KERNEL
:
987 * Reset the position of what has been read from the
988 * metadata cache to 0 so we can dump it again.
990 ret
= kernctl_metadata_cache_dump(stream
->wait_fd
);
992 case LTTNG_CONSUMER32_UST
:
993 case LTTNG_CONSUMER64_UST
:
995 * Reset the position pushed from the metadata cache so it
996 * will write from the beginning on the next push.
998 stream
->ust_metadata_pushed
= 0;
999 ret
= consumer_metadata_wakeup_pipe(stream
->chan
);
1002 ERR("Unknown consumer_data type");
1006 ERR("Failed to dump the metadata cache");
1012 int lttng_consumer_channel_set_trace_chunk(
1013 struct lttng_consumer_channel
*channel
,
1014 struct lttng_trace_chunk
*new_trace_chunk
)
1017 const bool is_local_trace
= channel
->relayd_id
== -1ULL;
1018 bool update_stream_trace_chunk
;
1019 struct cds_lfht_iter iter
;
1020 struct lttng_consumer_stream
*stream
;
1021 unsigned long channel_hash
;
1023 pthread_mutex_lock(&channel
->lock
);
1025 * A stream can transition to a state where it and its channel
1026 * no longer belong to a trace chunk. For instance, this happens when
1027 * a session is rotated while it is inactive. After the rotation
1028 * of an inactive session completes, the channel and its streams no
1029 * longer belong to a trace chunk.
1031 * However, if a session is stopped, rotated, and started again,
1032 * the session daemon will create a new chunk and send it to its peers.
1033 * In that case, the streams' transition to a new chunk can be performed
1036 * This trace chunk transition could also be performed lazily when
1037 * a buffer is consumed. However, creating the files here allows the
1038 * consumer daemon to report any creation error to the session daemon
1039 * and cause the start of the tracing session to fail.
1041 update_stream_trace_chunk
= !channel
->trace_chunk
&& new_trace_chunk
;
1044 * The acquisition of the reference cannot fail (barring
1045 * a severe internal error) since a reference to the published
1046 * chunk is already held by the caller.
1048 if (new_trace_chunk
) {
1049 const bool acquired_reference
= lttng_trace_chunk_get(
1052 assert(acquired_reference
);
1055 lttng_trace_chunk_put(channel
->trace_chunk
);
1056 channel
->trace_chunk
= new_trace_chunk
;
1057 if (!is_local_trace
|| !new_trace_chunk
) {
1062 if (!update_stream_trace_chunk
) {
1066 channel_hash
= consumer_data
.stream_per_chan_id_ht
->hash_fct(
1067 &channel
->key
, lttng_ht_seed
);
1069 cds_lfht_for_each_entry_duplicate(consumer_data
.stream_per_chan_id_ht
->ht
,
1071 consumer_data
.stream_per_chan_id_ht
->match_fct
,
1072 &channel
->key
, &iter
, stream
, node_channel_id
.node
) {
1073 bool acquired_reference
, should_regenerate_metadata
= false;
1075 acquired_reference
= lttng_trace_chunk_get(channel
->trace_chunk
);
1076 assert(acquired_reference
);
1078 pthread_mutex_lock(&stream
->lock
);
1081 * On a transition from "no-chunk" to a new chunk, a metadata
1082 * stream's content must be entirely dumped. This must occcur
1083 * _after_ the creation of the metadata stream's output files
1084 * as the consumption thread (not necessarily the one executing
1085 * this) may start to consume during the call to
1086 * consumer_metadata_stream_dump().
1088 should_regenerate_metadata
=
1089 stream
->metadata_flag
&&
1090 !stream
->trace_chunk
&& channel
->trace_chunk
;
1091 stream
->trace_chunk
= channel
->trace_chunk
;
1092 ret
= consumer_stream_create_output_files(stream
, true);
1094 pthread_mutex_unlock(&stream
->lock
);
1095 goto end_rcu_unlock
;
1097 if (should_regenerate_metadata
) {
1098 ret
= consumer_metadata_stream_dump(stream
);
1100 pthread_mutex_unlock(&stream
->lock
);
1102 goto end_rcu_unlock
;
1108 pthread_mutex_unlock(&channel
->lock
);
1113 * Allocate and return a new lttng_consumer_channel object using the given key
1114 * to initialize the hash table node.
1116 * On error, return NULL.
1118 struct lttng_consumer_channel
*consumer_allocate_channel(uint64_t key
,
1119 uint64_t session_id
,
1120 const uint64_t *chunk_id
,
1121 const char *pathname
,
1124 enum lttng_event_output output
,
1125 uint64_t tracefile_size
,
1126 uint64_t tracefile_count
,
1127 uint64_t session_id_per_pid
,
1128 unsigned int monitor
,
1129 unsigned int live_timer_interval
,
1130 const char *root_shm_path
,
1131 const char *shm_path
)
1133 struct lttng_consumer_channel
*channel
= NULL
;
1134 struct lttng_trace_chunk
*trace_chunk
= NULL
;
1137 trace_chunk
= lttng_trace_chunk_registry_find_chunk(
1138 consumer_data
.chunk_registry
, session_id
,
1141 ERR("Failed to find trace chunk reference during creation of channel");
1146 channel
= zmalloc(sizeof(*channel
));
1147 if (channel
== NULL
) {
1148 PERROR("malloc struct lttng_consumer_channel");
1153 channel
->refcount
= 0;
1154 channel
->session_id
= session_id
;
1155 channel
->session_id_per_pid
= session_id_per_pid
;
1156 channel
->relayd_id
= relayd_id
;
1157 channel
->tracefile_size
= tracefile_size
;
1158 channel
->tracefile_count
= tracefile_count
;
1159 channel
->monitor
= monitor
;
1160 channel
->live_timer_interval
= live_timer_interval
;
1161 pthread_mutex_init(&channel
->lock
, NULL
);
1162 pthread_mutex_init(&channel
->timer_lock
, NULL
);
1165 case LTTNG_EVENT_SPLICE
:
1166 channel
->output
= CONSUMER_CHANNEL_SPLICE
;
1168 case LTTNG_EVENT_MMAP
:
1169 channel
->output
= CONSUMER_CHANNEL_MMAP
;
1179 * In monitor mode, the streams associated with the channel will be put in
1180 * a special list ONLY owned by this channel. So, the refcount is set to 1
1181 * here meaning that the channel itself has streams that are referenced.
1183 * On a channel deletion, once the channel is no longer visible, the
1184 * refcount is decremented and checked for a zero value to delete it. With
1185 * streams in no monitor mode, it will now be safe to destroy the channel.
1187 if (!channel
->monitor
) {
1188 channel
->refcount
= 1;
1191 strncpy(channel
->pathname
, pathname
, sizeof(channel
->pathname
));
1192 channel
->pathname
[sizeof(channel
->pathname
) - 1] = '\0';
1194 strncpy(channel
->name
, name
, sizeof(channel
->name
));
1195 channel
->name
[sizeof(channel
->name
) - 1] = '\0';
1197 if (root_shm_path
) {
1198 strncpy(channel
->root_shm_path
, root_shm_path
, sizeof(channel
->root_shm_path
));
1199 channel
->root_shm_path
[sizeof(channel
->root_shm_path
) - 1] = '\0';
1202 strncpy(channel
->shm_path
, shm_path
, sizeof(channel
->shm_path
));
1203 channel
->shm_path
[sizeof(channel
->shm_path
) - 1] = '\0';
1206 lttng_ht_node_init_u64(&channel
->node
, channel
->key
);
1207 lttng_ht_node_init_u64(&channel
->channels_by_session_id_ht_node
,
1208 channel
->session_id
);
1210 channel
->wait_fd
= -1;
1211 CDS_INIT_LIST_HEAD(&channel
->streams
.head
);
1214 int ret
= lttng_consumer_channel_set_trace_chunk(channel
,
1221 DBG("Allocated channel (key %" PRIu64
")", channel
->key
);
1224 lttng_trace_chunk_put(trace_chunk
);
1227 consumer_del_channel(channel
);
1233 * Add a channel to the global list protected by a mutex.
1235 * Always return 0 indicating success.
1237 int consumer_add_channel(struct lttng_consumer_channel
*channel
,
1238 struct lttng_consumer_local_data
*ctx
)
1240 pthread_mutex_lock(&consumer_data
.lock
);
1241 pthread_mutex_lock(&channel
->lock
);
1242 pthread_mutex_lock(&channel
->timer_lock
);
1245 * This gives us a guarantee that the channel we are about to add to the
1246 * channel hash table will be unique. See this function comment on the why
1247 * we need to steel the channel key at this stage.
1249 steal_channel_key(channel
->key
);
1252 lttng_ht_add_unique_u64(consumer_data
.channel_ht
, &channel
->node
);
1253 lttng_ht_add_u64(consumer_data
.channels_by_session_id_ht
,
1254 &channel
->channels_by_session_id_ht_node
);
1256 channel
->is_published
= true;
1258 pthread_mutex_unlock(&channel
->timer_lock
);
1259 pthread_mutex_unlock(&channel
->lock
);
1260 pthread_mutex_unlock(&consumer_data
.lock
);
1262 if (channel
->wait_fd
!= -1 && channel
->type
== CONSUMER_CHANNEL_TYPE_DATA
) {
1263 notify_channel_pipe(ctx
, channel
, -1, CONSUMER_CHANNEL_ADD
);
1270 * Allocate the pollfd structure and the local view of the out fds to avoid
1271 * doing a lookup in the linked list and concurrency issues when writing is
1272 * needed. Called with consumer_data.lock held.
1274 * Returns the number of fds in the structures.
1276 static int update_poll_array(struct lttng_consumer_local_data
*ctx
,
1277 struct pollfd
**pollfd
, struct lttng_consumer_stream
**local_stream
,
1278 struct lttng_ht
*ht
, int *nb_inactive_fd
)
1281 struct lttng_ht_iter iter
;
1282 struct lttng_consumer_stream
*stream
;
1287 assert(local_stream
);
1289 DBG("Updating poll fd array");
1290 *nb_inactive_fd
= 0;
1292 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1294 * Only active streams with an active end point can be added to the
1295 * poll set and local stream storage of the thread.
1297 * There is a potential race here for endpoint_status to be updated
1298 * just after the check. However, this is OK since the stream(s) will
1299 * be deleted once the thread is notified that the end point state has
1300 * changed where this function will be called back again.
1302 * We track the number of inactive FDs because they still need to be
1303 * closed by the polling thread after a wakeup on the data_pipe or
1306 if (stream
->endpoint_status
== CONSUMER_ENDPOINT_INACTIVE
) {
1307 (*nb_inactive_fd
)++;
1311 * This clobbers way too much the debug output. Uncomment that if you
1312 * need it for debugging purposes.
1314 (*pollfd
)[i
].fd
= stream
->wait_fd
;
1315 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
1316 local_stream
[i
] = stream
;
1322 * Insert the consumer_data_pipe at the end of the array and don't
1323 * increment i so nb_fd is the number of real FD.
1325 (*pollfd
)[i
].fd
= lttng_pipe_get_readfd(ctx
->consumer_data_pipe
);
1326 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
1328 (*pollfd
)[i
+ 1].fd
= lttng_pipe_get_readfd(ctx
->consumer_wakeup_pipe
);
1329 (*pollfd
)[i
+ 1].events
= POLLIN
| POLLPRI
;
1334 * Poll on the should_quit pipe and the command socket return -1 on
1335 * error, 1 if should exit, 0 if data is available on the command socket
1337 int lttng_consumer_poll_socket(struct pollfd
*consumer_sockpoll
)
1342 num_rdy
= poll(consumer_sockpoll
, 2, -1);
1343 if (num_rdy
== -1) {
1345 * Restart interrupted system call.
1347 if (errno
== EINTR
) {
1350 PERROR("Poll error");
1353 if (consumer_sockpoll
[0].revents
& (POLLIN
| POLLPRI
)) {
1354 DBG("consumer_should_quit wake up");
1361 * Set the error socket.
1363 void lttng_consumer_set_error_sock(struct lttng_consumer_local_data
*ctx
,
1366 ctx
->consumer_error_socket
= sock
;
1370 * Set the command socket path.
1372 void lttng_consumer_set_command_sock_path(
1373 struct lttng_consumer_local_data
*ctx
, char *sock
)
1375 ctx
->consumer_command_sock_path
= sock
;
1379 * Send return code to the session daemon.
1380 * If the socket is not defined, we return 0, it is not a fatal error
1382 int lttng_consumer_send_error(struct lttng_consumer_local_data
*ctx
, int cmd
)
1384 if (ctx
->consumer_error_socket
> 0) {
1385 return lttcomm_send_unix_sock(ctx
->consumer_error_socket
, &cmd
,
1386 sizeof(enum lttcomm_sessiond_command
));
1393 * Close all the tracefiles and stream fds and MUST be called when all
1394 * instances are destroyed i.e. when all threads were joined and are ended.
1396 void lttng_consumer_cleanup(void)
1398 struct lttng_ht_iter iter
;
1399 struct lttng_consumer_channel
*channel
;
1403 cds_lfht_for_each_entry(consumer_data
.channel_ht
->ht
, &iter
.iter
, channel
,
1405 consumer_del_channel(channel
);
1410 lttng_ht_destroy(consumer_data
.channel_ht
);
1411 lttng_ht_destroy(consumer_data
.channels_by_session_id_ht
);
1413 cleanup_relayd_ht();
1415 lttng_ht_destroy(consumer_data
.stream_per_chan_id_ht
);
1418 * This HT contains streams that are freed by either the metadata thread or
1419 * the data thread so we do *nothing* on the hash table and simply destroy
1422 lttng_ht_destroy(consumer_data
.stream_list_ht
);
1424 lttng_trace_chunk_registry_destroy(consumer_data
.chunk_registry
);
1428 * Called from signal handler.
1430 void lttng_consumer_should_exit(struct lttng_consumer_local_data
*ctx
)
1434 CMM_STORE_SHARED(consumer_quit
, 1);
1435 ret
= lttng_write(ctx
->consumer_should_quit
[1], "4", 1);
1437 PERROR("write consumer quit");
1440 DBG("Consumer flag that it should quit");
1445 * Flush pending writes to trace output disk file.
1448 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream
*stream
,
1452 int outfd
= stream
->out_fd
;
1455 * This does a blocking write-and-wait on any page that belongs to the
1456 * subbuffer prior to the one we just wrote.
1457 * Don't care about error values, as these are just hints and ways to
1458 * limit the amount of page cache used.
1460 if (orig_offset
< stream
->max_sb_size
) {
1463 lttng_sync_file_range(outfd
, orig_offset
- stream
->max_sb_size
,
1464 stream
->max_sb_size
,
1465 SYNC_FILE_RANGE_WAIT_BEFORE
1466 | SYNC_FILE_RANGE_WRITE
1467 | SYNC_FILE_RANGE_WAIT_AFTER
);
1469 * Give hints to the kernel about how we access the file:
1470 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1473 * We need to call fadvise again after the file grows because the
1474 * kernel does not seem to apply fadvise to non-existing parts of the
1477 * Call fadvise _after_ having waited for the page writeback to
1478 * complete because the dirty page writeback semantic is not well
1479 * defined. So it can be expected to lead to lower throughput in
1482 ret
= posix_fadvise(outfd
, orig_offset
- stream
->max_sb_size
,
1483 stream
->max_sb_size
, POSIX_FADV_DONTNEED
);
1484 if (ret
&& ret
!= -ENOSYS
) {
1486 PERROR("posix_fadvise on fd %i", outfd
);
1491 * Initialise the necessary environnement :
1492 * - create a new context
1493 * - create the poll_pipe
1494 * - create the should_quit pipe (for signal handler)
1495 * - create the thread pipe (for splice)
1497 * Takes a function pointer as argument, this function is called when data is
1498 * available on a buffer. This function is responsible to do the
1499 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1500 * buffer configuration and then kernctl_put_next_subbuf at the end.
1502 * Returns a pointer to the new context or NULL on error.
1504 struct lttng_consumer_local_data
*lttng_consumer_create(
1505 enum lttng_consumer_type type
,
1506 ssize_t (*buffer_ready
)(struct lttng_consumer_stream
*stream
,
1507 struct lttng_consumer_local_data
*ctx
),
1508 int (*recv_channel
)(struct lttng_consumer_channel
*channel
),
1509 int (*recv_stream
)(struct lttng_consumer_stream
*stream
),
1510 int (*update_stream
)(uint64_t stream_key
, uint32_t state
))
1513 struct lttng_consumer_local_data
*ctx
;
1515 assert(consumer_data
.type
== LTTNG_CONSUMER_UNKNOWN
||
1516 consumer_data
.type
== type
);
1517 consumer_data
.type
= type
;
1519 ctx
= zmalloc(sizeof(struct lttng_consumer_local_data
));
1521 PERROR("allocating context");
1525 ctx
->consumer_error_socket
= -1;
1526 ctx
->consumer_metadata_socket
= -1;
1527 pthread_mutex_init(&ctx
->metadata_socket_lock
, NULL
);
1528 /* assign the callbacks */
1529 ctx
->on_buffer_ready
= buffer_ready
;
1530 ctx
->on_recv_channel
= recv_channel
;
1531 ctx
->on_recv_stream
= recv_stream
;
1532 ctx
->on_update_stream
= update_stream
;
1534 ctx
->consumer_data_pipe
= lttng_pipe_open(0);
1535 if (!ctx
->consumer_data_pipe
) {
1536 goto error_poll_pipe
;
1539 ctx
->consumer_wakeup_pipe
= lttng_pipe_open(0);
1540 if (!ctx
->consumer_wakeup_pipe
) {
1541 goto error_wakeup_pipe
;
1544 ret
= pipe(ctx
->consumer_should_quit
);
1546 PERROR("Error creating recv pipe");
1547 goto error_quit_pipe
;
1550 ret
= pipe(ctx
->consumer_channel_pipe
);
1552 PERROR("Error creating channel pipe");
1553 goto error_channel_pipe
;
1556 ctx
->consumer_metadata_pipe
= lttng_pipe_open(0);
1557 if (!ctx
->consumer_metadata_pipe
) {
1558 goto error_metadata_pipe
;
1561 ctx
->channel_monitor_pipe
= -1;
1565 error_metadata_pipe
:
1566 utils_close_pipe(ctx
->consumer_channel_pipe
);
1568 utils_close_pipe(ctx
->consumer_should_quit
);
1570 lttng_pipe_destroy(ctx
->consumer_wakeup_pipe
);
1572 lttng_pipe_destroy(ctx
->consumer_data_pipe
);
1580 * Iterate over all streams of the hashtable and free them properly.
1582 static void destroy_data_stream_ht(struct lttng_ht
*ht
)
1584 struct lttng_ht_iter iter
;
1585 struct lttng_consumer_stream
*stream
;
1592 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1594 * Ignore return value since we are currently cleaning up so any error
1597 (void) consumer_del_stream(stream
, ht
);
1601 lttng_ht_destroy(ht
);
1605 * Iterate over all streams of the metadata hashtable and free them
1608 static void destroy_metadata_stream_ht(struct lttng_ht
*ht
)
1610 struct lttng_ht_iter iter
;
1611 struct lttng_consumer_stream
*stream
;
1618 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1620 * Ignore return value since we are currently cleaning up so any error
1623 (void) consumer_del_metadata_stream(stream
, ht
);
1627 lttng_ht_destroy(ht
);
1631 * Close all fds associated with the instance and free the context.
1633 void lttng_consumer_destroy(struct lttng_consumer_local_data
*ctx
)
1637 DBG("Consumer destroying it. Closing everything.");
1643 destroy_data_stream_ht(data_ht
);
1644 destroy_metadata_stream_ht(metadata_ht
);
1646 ret
= close(ctx
->consumer_error_socket
);
1650 ret
= close(ctx
->consumer_metadata_socket
);
1654 utils_close_pipe(ctx
->consumer_channel_pipe
);
1655 lttng_pipe_destroy(ctx
->consumer_data_pipe
);
1656 lttng_pipe_destroy(ctx
->consumer_metadata_pipe
);
1657 lttng_pipe_destroy(ctx
->consumer_wakeup_pipe
);
1658 utils_close_pipe(ctx
->consumer_should_quit
);
1660 unlink(ctx
->consumer_command_sock_path
);
1665 * Write the metadata stream id on the specified file descriptor.
1667 static int write_relayd_metadata_id(int fd
,
1668 struct lttng_consumer_stream
*stream
,
1669 unsigned long padding
)
1672 struct lttcomm_relayd_metadata_payload hdr
;
1674 hdr
.stream_id
= htobe64(stream
->relayd_stream_id
);
1675 hdr
.padding_size
= htobe32(padding
);
1676 ret
= lttng_write(fd
, (void *) &hdr
, sizeof(hdr
));
1677 if (ret
< sizeof(hdr
)) {
1679 * This error means that the fd's end is closed so ignore the PERROR
1680 * not to clubber the error output since this can happen in a normal
1683 if (errno
!= EPIPE
) {
1684 PERROR("write metadata stream id");
1686 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno
);
1688 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1689 * handle writting the missing part so report that as an error and
1690 * don't lie to the caller.
1695 DBG("Metadata stream id %" PRIu64
" with padding %lu written before data",
1696 stream
->relayd_stream_id
, padding
);
1703 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1704 * core function for writing trace buffers to either the local filesystem or
1707 * It must be called with the stream and the channel lock held.
1709 * Careful review MUST be put if any changes occur!
1711 * Returns the number of bytes written
1713 ssize_t
lttng_consumer_on_read_subbuffer_mmap(
1714 struct lttng_consumer_local_data
*ctx
,
1715 struct lttng_consumer_stream
*stream
, unsigned long len
,
1716 unsigned long padding
,
1717 struct ctf_packet_index
*index
)
1719 unsigned long mmap_offset
;
1722 off_t orig_offset
= stream
->out_fd_offset
;
1723 /* Default is on the disk */
1724 int outfd
= stream
->out_fd
;
1725 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1726 unsigned int relayd_hang_up
= 0;
1728 /* RCU lock for the relayd pointer */
1730 assert(stream
->net_seq_idx
!= (uint64_t) -1ULL ||
1731 stream
->trace_chunk
);
1733 /* Flag that the current stream if set for network streaming. */
1734 if (stream
->net_seq_idx
!= (uint64_t) -1ULL) {
1735 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1736 if (relayd
== NULL
) {
1742 /* get the offset inside the fd to mmap */
1743 switch (consumer_data
.type
) {
1744 case LTTNG_CONSUMER_KERNEL
:
1745 mmap_base
= stream
->mmap_base
;
1746 ret
= kernctl_get_mmap_read_offset(stream
->wait_fd
, &mmap_offset
);
1748 PERROR("tracer ctl get_mmap_read_offset");
1752 case LTTNG_CONSUMER32_UST
:
1753 case LTTNG_CONSUMER64_UST
:
1754 mmap_base
= lttng_ustctl_get_mmap_base(stream
);
1756 ERR("read mmap get mmap base for stream %s", stream
->name
);
1760 ret
= lttng_ustctl_get_mmap_read_offset(stream
, &mmap_offset
);
1762 PERROR("tracer ctl get_mmap_read_offset");
1768 ERR("Unknown consumer_data type");
1772 /* Handle stream on the relayd if the output is on the network */
1774 unsigned long netlen
= len
;
1777 * Lock the control socket for the complete duration of the function
1778 * since from this point on we will use the socket.
1780 if (stream
->metadata_flag
) {
1781 /* Metadata requires the control socket. */
1782 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1783 if (stream
->reset_metadata_flag
) {
1784 ret
= relayd_reset_metadata(&relayd
->control_sock
,
1785 stream
->relayd_stream_id
,
1786 stream
->metadata_version
);
1791 stream
->reset_metadata_flag
= 0;
1793 netlen
+= sizeof(struct lttcomm_relayd_metadata_payload
);
1796 ret
= write_relayd_stream_header(stream
, netlen
, padding
, relayd
);
1801 /* Use the returned socket. */
1804 /* Write metadata stream id before payload */
1805 if (stream
->metadata_flag
) {
1806 ret
= write_relayd_metadata_id(outfd
, stream
, padding
);
1813 /* No streaming, we have to set the len with the full padding */
1816 if (stream
->metadata_flag
&& stream
->reset_metadata_flag
) {
1817 ret
= utils_truncate_stream_file(stream
->out_fd
, 0);
1819 ERR("Reset metadata file");
1822 stream
->reset_metadata_flag
= 0;
1826 * Check if we need to change the tracefile before writing the packet.
1828 if (stream
->chan
->tracefile_size
> 0 &&
1829 (stream
->tracefile_size_current
+ len
) >
1830 stream
->chan
->tracefile_size
) {
1831 ret
= consumer_stream_rotate_output_files(stream
);
1835 outfd
= stream
->out_fd
;
1838 stream
->tracefile_size_current
+= len
;
1840 index
->offset
= htobe64(stream
->out_fd_offset
);
1845 * This call guarantee that len or less is returned. It's impossible to
1846 * receive a ret value that is bigger than len.
1848 ret
= lttng_write(outfd
, mmap_base
+ mmap_offset
, len
);
1849 DBG("Consumer mmap write() ret %zd (len %lu)", ret
, len
);
1850 if (ret
< 0 || ((size_t) ret
!= len
)) {
1852 * Report error to caller if nothing was written else at least send the
1860 /* Socket operation failed. We consider the relayd dead */
1861 if (errno
== EPIPE
) {
1863 * This is possible if the fd is closed on the other side
1864 * (outfd) or any write problem. It can be verbose a bit for a
1865 * normal execution if for instance the relayd is stopped
1866 * abruptly. This can happen so set this to a DBG statement.
1868 DBG("Consumer mmap write detected relayd hang up");
1870 /* Unhandled error, print it and stop function right now. */
1871 PERROR("Error in write mmap (ret %zd != len %lu)", ret
, len
);
1875 stream
->output_written
+= ret
;
1877 /* This call is useless on a socket so better save a syscall. */
1879 /* This won't block, but will start writeout asynchronously */
1880 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, len
,
1881 SYNC_FILE_RANGE_WRITE
);
1882 stream
->out_fd_offset
+= len
;
1883 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1888 * This is a special case that the relayd has closed its socket. Let's
1889 * cleanup the relayd object and all associated streams.
1891 if (relayd
&& relayd_hang_up
) {
1892 ERR("Relayd hangup. Cleaning up relayd %" PRIu64
".", relayd
->net_seq_idx
);
1893 lttng_consumer_cleanup_relayd(relayd
);
1897 /* Unlock only if ctrl socket used */
1898 if (relayd
&& stream
->metadata_flag
) {
1899 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1907 * Splice the data from the ring buffer to the tracefile.
1909 * It must be called with the stream lock held.
1911 * Returns the number of bytes spliced.
1913 ssize_t
lttng_consumer_on_read_subbuffer_splice(
1914 struct lttng_consumer_local_data
*ctx
,
1915 struct lttng_consumer_stream
*stream
, unsigned long len
,
1916 unsigned long padding
,
1917 struct ctf_packet_index
*index
)
1919 ssize_t ret
= 0, written
= 0, ret_splice
= 0;
1921 off_t orig_offset
= stream
->out_fd_offset
;
1922 int fd
= stream
->wait_fd
;
1923 /* Default is on the disk */
1924 int outfd
= stream
->out_fd
;
1925 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1927 unsigned int relayd_hang_up
= 0;
1929 switch (consumer_data
.type
) {
1930 case LTTNG_CONSUMER_KERNEL
:
1932 case LTTNG_CONSUMER32_UST
:
1933 case LTTNG_CONSUMER64_UST
:
1934 /* Not supported for user space tracing */
1937 ERR("Unknown consumer_data type");
1941 /* RCU lock for the relayd pointer */
1944 /* Flag that the current stream if set for network streaming. */
1945 if (stream
->net_seq_idx
!= (uint64_t) -1ULL) {
1946 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1947 if (relayd
== NULL
) {
1952 splice_pipe
= stream
->splice_pipe
;
1954 /* Write metadata stream id before payload */
1956 unsigned long total_len
= len
;
1958 if (stream
->metadata_flag
) {
1960 * Lock the control socket for the complete duration of the function
1961 * since from this point on we will use the socket.
1963 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1965 if (stream
->reset_metadata_flag
) {
1966 ret
= relayd_reset_metadata(&relayd
->control_sock
,
1967 stream
->relayd_stream_id
,
1968 stream
->metadata_version
);
1973 stream
->reset_metadata_flag
= 0;
1975 ret
= write_relayd_metadata_id(splice_pipe
[1], stream
,
1983 total_len
+= sizeof(struct lttcomm_relayd_metadata_payload
);
1986 ret
= write_relayd_stream_header(stream
, total_len
, padding
, relayd
);
1992 /* Use the returned socket. */
1995 /* No streaming, we have to set the len with the full padding */
1998 if (stream
->metadata_flag
&& stream
->reset_metadata_flag
) {
1999 ret
= utils_truncate_stream_file(stream
->out_fd
, 0);
2001 ERR("Reset metadata file");
2004 stream
->reset_metadata_flag
= 0;
2007 * Check if we need to change the tracefile before writing the packet.
2009 if (stream
->chan
->tracefile_size
> 0 &&
2010 (stream
->tracefile_size_current
+ len
) >
2011 stream
->chan
->tracefile_size
) {
2012 ret
= consumer_stream_rotate_output_files(stream
);
2017 outfd
= stream
->out_fd
;
2020 stream
->tracefile_size_current
+= len
;
2021 index
->offset
= htobe64(stream
->out_fd_offset
);
2025 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
2026 (unsigned long)offset
, len
, fd
, splice_pipe
[1]);
2027 ret_splice
= splice(fd
, &offset
, splice_pipe
[1], NULL
, len
,
2028 SPLICE_F_MOVE
| SPLICE_F_MORE
);
2029 DBG("splice chan to pipe, ret %zd", ret_splice
);
2030 if (ret_splice
< 0) {
2033 PERROR("Error in relay splice");
2037 /* Handle stream on the relayd if the output is on the network */
2038 if (relayd
&& stream
->metadata_flag
) {
2039 size_t metadata_payload_size
=
2040 sizeof(struct lttcomm_relayd_metadata_payload
);
2042 /* Update counter to fit the spliced data */
2043 ret_splice
+= metadata_payload_size
;
2044 len
+= metadata_payload_size
;
2046 * We do this so the return value can match the len passed as
2047 * argument to this function.
2049 written
-= metadata_payload_size
;
2052 /* Splice data out */
2053 ret_splice
= splice(splice_pipe
[0], NULL
, outfd
, NULL
,
2054 ret_splice
, SPLICE_F_MOVE
| SPLICE_F_MORE
);
2055 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
2057 if (ret_splice
< 0) {
2062 } else if (ret_splice
> len
) {
2064 * We don't expect this code path to be executed but you never know
2065 * so this is an extra protection agains a buggy splice().
2068 written
+= ret_splice
;
2069 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice
,
2073 /* All good, update current len and continue. */
2077 /* This call is useless on a socket so better save a syscall. */
2079 /* This won't block, but will start writeout asynchronously */
2080 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret_splice
,
2081 SYNC_FILE_RANGE_WRITE
);
2082 stream
->out_fd_offset
+= ret_splice
;
2084 stream
->output_written
+= ret_splice
;
2085 written
+= ret_splice
;
2088 lttng_consumer_sync_trace_file(stream
, orig_offset
);
2094 * This is a special case that the relayd has closed its socket. Let's
2095 * cleanup the relayd object and all associated streams.
2097 if (relayd
&& relayd_hang_up
) {
2098 ERR("Relayd hangup. Cleaning up relayd %" PRIu64
".", relayd
->net_seq_idx
);
2099 lttng_consumer_cleanup_relayd(relayd
);
2100 /* Skip splice error so the consumer does not fail */
2105 /* send the appropriate error description to sessiond */
2108 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_EINVAL
);
2111 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_ENOMEM
);
2114 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_ESPIPE
);
2119 if (relayd
&& stream
->metadata_flag
) {
2120 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
2128 * Sample the snapshot positions for a specific fd
2130 * Returns 0 on success, < 0 on error
2132 int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream
*stream
)
2134 switch (consumer_data
.type
) {
2135 case LTTNG_CONSUMER_KERNEL
:
2136 return lttng_kconsumer_sample_snapshot_positions(stream
);
2137 case LTTNG_CONSUMER32_UST
:
2138 case LTTNG_CONSUMER64_UST
:
2139 return lttng_ustconsumer_sample_snapshot_positions(stream
);
2141 ERR("Unknown consumer_data type");
2147 * Take a snapshot for a specific fd
2149 * Returns 0 on success, < 0 on error
2151 int lttng_consumer_take_snapshot(struct lttng_consumer_stream
*stream
)
2153 switch (consumer_data
.type
) {
2154 case LTTNG_CONSUMER_KERNEL
:
2155 return lttng_kconsumer_take_snapshot(stream
);
2156 case LTTNG_CONSUMER32_UST
:
2157 case LTTNG_CONSUMER64_UST
:
2158 return lttng_ustconsumer_take_snapshot(stream
);
2160 ERR("Unknown consumer_data type");
2167 * Get the produced position
2169 * Returns 0 on success, < 0 on error
2171 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream
*stream
,
2174 switch (consumer_data
.type
) {
2175 case LTTNG_CONSUMER_KERNEL
:
2176 return lttng_kconsumer_get_produced_snapshot(stream
, pos
);
2177 case LTTNG_CONSUMER32_UST
:
2178 case LTTNG_CONSUMER64_UST
:
2179 return lttng_ustconsumer_get_produced_snapshot(stream
, pos
);
2181 ERR("Unknown consumer_data type");
2188 * Get the consumed position (free-running counter position in bytes).
2190 * Returns 0 on success, < 0 on error
2192 int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream
*stream
,
2195 switch (consumer_data
.type
) {
2196 case LTTNG_CONSUMER_KERNEL
:
2197 return lttng_kconsumer_get_consumed_snapshot(stream
, pos
);
2198 case LTTNG_CONSUMER32_UST
:
2199 case LTTNG_CONSUMER64_UST
:
2200 return lttng_ustconsumer_get_consumed_snapshot(stream
, pos
);
2202 ERR("Unknown consumer_data type");
2208 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data
*ctx
,
2209 int sock
, struct pollfd
*consumer_sockpoll
)
2211 switch (consumer_data
.type
) {
2212 case LTTNG_CONSUMER_KERNEL
:
2213 return lttng_kconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
2214 case LTTNG_CONSUMER32_UST
:
2215 case LTTNG_CONSUMER64_UST
:
2216 return lttng_ustconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
2218 ERR("Unknown consumer_data type");
2224 void lttng_consumer_close_all_metadata(void)
2226 switch (consumer_data
.type
) {
2227 case LTTNG_CONSUMER_KERNEL
:
2229 * The Kernel consumer has a different metadata scheme so we don't
2230 * close anything because the stream will be closed by the session
2234 case LTTNG_CONSUMER32_UST
:
2235 case LTTNG_CONSUMER64_UST
:
2237 * Close all metadata streams. The metadata hash table is passed and
2238 * this call iterates over it by closing all wakeup fd. This is safe
2239 * because at this point we are sure that the metadata producer is
2240 * either dead or blocked.
2242 lttng_ustconsumer_close_all_metadata(metadata_ht
);
2245 ERR("Unknown consumer_data type");
2251 * Clean up a metadata stream and free its memory.
2253 void consumer_del_metadata_stream(struct lttng_consumer_stream
*stream
,
2254 struct lttng_ht
*ht
)
2256 struct lttng_consumer_channel
*free_chan
= NULL
;
2260 * This call should NEVER receive regular stream. It must always be
2261 * metadata stream and this is crucial for data structure synchronization.
2263 assert(stream
->metadata_flag
);
2265 DBG3("Consumer delete metadata stream %d", stream
->wait_fd
);
2267 pthread_mutex_lock(&consumer_data
.lock
);
2268 pthread_mutex_lock(&stream
->chan
->lock
);
2269 pthread_mutex_lock(&stream
->lock
);
2270 if (stream
->chan
->metadata_cache
) {
2271 /* Only applicable to userspace consumers. */
2272 pthread_mutex_lock(&stream
->chan
->metadata_cache
->lock
);
2275 /* Remove any reference to that stream. */
2276 consumer_stream_delete(stream
, ht
);
2278 /* Close down everything including the relayd if one. */
2279 consumer_stream_close(stream
);
2280 /* Destroy tracer buffers of the stream. */
2281 consumer_stream_destroy_buffers(stream
);
2283 /* Atomically decrement channel refcount since other threads can use it. */
2284 if (!uatomic_sub_return(&stream
->chan
->refcount
, 1)
2285 && !uatomic_read(&stream
->chan
->nb_init_stream_left
)) {
2286 /* Go for channel deletion! */
2287 free_chan
= stream
->chan
;
2291 * Nullify the stream reference so it is not used after deletion. The
2292 * channel lock MUST be acquired before being able to check for a NULL
2295 stream
->chan
->metadata_stream
= NULL
;
2297 if (stream
->chan
->metadata_cache
) {
2298 pthread_mutex_unlock(&stream
->chan
->metadata_cache
->lock
);
2300 pthread_mutex_unlock(&stream
->lock
);
2301 pthread_mutex_unlock(&stream
->chan
->lock
);
2302 pthread_mutex_unlock(&consumer_data
.lock
);
2305 consumer_del_channel(free_chan
);
2308 lttng_trace_chunk_put(stream
->trace_chunk
);
2309 stream
->trace_chunk
= NULL
;
2310 consumer_stream_free(stream
);
2314 * Action done with the metadata stream when adding it to the consumer internal
2315 * data structures to handle it.
2317 void consumer_add_metadata_stream(struct lttng_consumer_stream
*stream
)
2319 struct lttng_ht
*ht
= metadata_ht
;
2320 struct lttng_ht_iter iter
;
2321 struct lttng_ht_node_u64
*node
;
2326 DBG3("Adding metadata stream %" PRIu64
" to hash table", stream
->key
);
2328 pthread_mutex_lock(&consumer_data
.lock
);
2329 pthread_mutex_lock(&stream
->chan
->lock
);
2330 pthread_mutex_lock(&stream
->chan
->timer_lock
);
2331 pthread_mutex_lock(&stream
->lock
);
2334 * From here, refcounts are updated so be _careful_ when returning an error
2341 * Lookup the stream just to make sure it does not exist in our internal
2342 * state. This should NEVER happen.
2344 lttng_ht_lookup(ht
, &stream
->key
, &iter
);
2345 node
= lttng_ht_iter_get_node_u64(&iter
);
2349 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2350 * in terms of destroying the associated channel, because the action that
2351 * causes the count to become 0 also causes a stream to be added. The
2352 * channel deletion will thus be triggered by the following removal of this
2355 if (uatomic_read(&stream
->chan
->nb_init_stream_left
) > 0) {
2356 /* Increment refcount before decrementing nb_init_stream_left */
2358 uatomic_dec(&stream
->chan
->nb_init_stream_left
);
2361 lttng_ht_add_unique_u64(ht
, &stream
->node
);
2363 lttng_ht_add_u64(consumer_data
.stream_per_chan_id_ht
,
2364 &stream
->node_channel_id
);
2367 * Add stream to the stream_list_ht of the consumer data. No need to steal
2368 * the key since the HT does not use it and we allow to add redundant keys
2371 lttng_ht_add_u64(consumer_data
.stream_list_ht
, &stream
->node_session_id
);
2375 pthread_mutex_unlock(&stream
->lock
);
2376 pthread_mutex_unlock(&stream
->chan
->lock
);
2377 pthread_mutex_unlock(&stream
->chan
->timer_lock
);
2378 pthread_mutex_unlock(&consumer_data
.lock
);
2382 * Delete data stream that are flagged for deletion (endpoint_status).
2384 static void validate_endpoint_status_data_stream(void)
2386 struct lttng_ht_iter iter
;
2387 struct lttng_consumer_stream
*stream
;
2389 DBG("Consumer delete flagged data stream");
2392 cds_lfht_for_each_entry(data_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
2393 /* Validate delete flag of the stream */
2394 if (stream
->endpoint_status
== CONSUMER_ENDPOINT_ACTIVE
) {
2397 /* Delete it right now */
2398 consumer_del_stream(stream
, data_ht
);
2404 * Delete metadata stream that are flagged for deletion (endpoint_status).
2406 static void validate_endpoint_status_metadata_stream(
2407 struct lttng_poll_event
*pollset
)
2409 struct lttng_ht_iter iter
;
2410 struct lttng_consumer_stream
*stream
;
2412 DBG("Consumer delete flagged metadata stream");
2417 cds_lfht_for_each_entry(metadata_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
2418 /* Validate delete flag of the stream */
2419 if (stream
->endpoint_status
== CONSUMER_ENDPOINT_ACTIVE
) {
2423 * Remove from pollset so the metadata thread can continue without
2424 * blocking on a deleted stream.
2426 lttng_poll_del(pollset
, stream
->wait_fd
);
2428 /* Delete it right now */
2429 consumer_del_metadata_stream(stream
, metadata_ht
);
2435 * Thread polls on metadata file descriptor and write them on disk or on the
2438 void *consumer_thread_metadata_poll(void *data
)
2440 int ret
, i
, pollfd
, err
= -1;
2441 uint32_t revents
, nb_fd
;
2442 struct lttng_consumer_stream
*stream
= NULL
;
2443 struct lttng_ht_iter iter
;
2444 struct lttng_ht_node_u64
*node
;
2445 struct lttng_poll_event events
;
2446 struct lttng_consumer_local_data
*ctx
= data
;
2449 rcu_register_thread();
2451 health_register(health_consumerd
, HEALTH_CONSUMERD_TYPE_METADATA
);
2453 if (testpoint(consumerd_thread_metadata
)) {
2454 goto error_testpoint
;
2457 health_code_update();
2459 DBG("Thread metadata poll started");
2461 /* Size is set to 1 for the consumer_metadata pipe */
2462 ret
= lttng_poll_create(&events
, 2, LTTNG_CLOEXEC
);
2464 ERR("Poll set creation failed");
2468 ret
= lttng_poll_add(&events
,
2469 lttng_pipe_get_readfd(ctx
->consumer_metadata_pipe
), LPOLLIN
);
2475 DBG("Metadata main loop started");
2479 health_code_update();
2480 health_poll_entry();
2481 DBG("Metadata poll wait");
2482 ret
= lttng_poll_wait(&events
, -1);
2483 DBG("Metadata poll return from wait with %d fd(s)",
2484 LTTNG_POLL_GETNB(&events
));
2486 DBG("Metadata event caught in thread");
2488 if (errno
== EINTR
) {
2489 ERR("Poll EINTR caught");
2492 if (LTTNG_POLL_GETNB(&events
) == 0) {
2493 err
= 0; /* All is OK */
2500 /* From here, the event is a metadata wait fd */
2501 for (i
= 0; i
< nb_fd
; i
++) {
2502 health_code_update();
2504 revents
= LTTNG_POLL_GETEV(&events
, i
);
2505 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
2507 if (pollfd
== lttng_pipe_get_readfd(ctx
->consumer_metadata_pipe
)) {
2508 if (revents
& LPOLLIN
) {
2511 pipe_len
= lttng_pipe_read(ctx
->consumer_metadata_pipe
,
2512 &stream
, sizeof(stream
));
2513 if (pipe_len
< sizeof(stream
)) {
2515 PERROR("read metadata stream");
2518 * Remove the pipe from the poll set and continue the loop
2519 * since their might be data to consume.
2521 lttng_poll_del(&events
,
2522 lttng_pipe_get_readfd(ctx
->consumer_metadata_pipe
));
2523 lttng_pipe_read_close(ctx
->consumer_metadata_pipe
);
2527 /* A NULL stream means that the state has changed. */
2528 if (stream
== NULL
) {
2529 /* Check for deleted streams. */
2530 validate_endpoint_status_metadata_stream(&events
);
2534 DBG("Adding metadata stream %d to poll set",
2537 /* Add metadata stream to the global poll events list */
2538 lttng_poll_add(&events
, stream
->wait_fd
,
2539 LPOLLIN
| LPOLLPRI
| LPOLLHUP
);
2540 } else if (revents
& (LPOLLERR
| LPOLLHUP
)) {
2541 DBG("Metadata thread pipe hung up");
2543 * Remove the pipe from the poll set and continue the loop
2544 * since their might be data to consume.
2546 lttng_poll_del(&events
,
2547 lttng_pipe_get_readfd(ctx
->consumer_metadata_pipe
));
2548 lttng_pipe_read_close(ctx
->consumer_metadata_pipe
);
2551 ERR("Unexpected poll events %u for sock %d", revents
, pollfd
);
2555 /* Handle other stream */
2561 uint64_t tmp_id
= (uint64_t) pollfd
;
2563 lttng_ht_lookup(metadata_ht
, &tmp_id
, &iter
);
2565 node
= lttng_ht_iter_get_node_u64(&iter
);
2568 stream
= caa_container_of(node
, struct lttng_consumer_stream
,
2571 if (revents
& (LPOLLIN
| LPOLLPRI
)) {
2572 /* Get the data out of the metadata file descriptor */
2573 DBG("Metadata available on fd %d", pollfd
);
2574 assert(stream
->wait_fd
== pollfd
);
2577 health_code_update();
2579 len
= ctx
->on_buffer_ready(stream
, ctx
);
2581 * We don't check the return value here since if we get
2582 * a negative len, it means an error occurred thus we
2583 * simply remove it from the poll set and free the
2588 /* It's ok to have an unavailable sub-buffer */
2589 if (len
< 0 && len
!= -EAGAIN
&& len
!= -ENODATA
) {
2590 /* Clean up stream from consumer and free it. */
2591 lttng_poll_del(&events
, stream
->wait_fd
);
2592 consumer_del_metadata_stream(stream
, metadata_ht
);
2594 } else if (revents
& (LPOLLERR
| LPOLLHUP
)) {
2595 DBG("Metadata fd %d is hup|err.", pollfd
);
2596 if (!stream
->hangup_flush_done
2597 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
2598 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
2599 DBG("Attempting to flush and consume the UST buffers");
2600 lttng_ustconsumer_on_stream_hangup(stream
);
2602 /* We just flushed the stream now read it. */
2604 health_code_update();
2606 len
= ctx
->on_buffer_ready(stream
, ctx
);
2608 * We don't check the return value here since if we get
2609 * a negative len, it means an error occurred thus we
2610 * simply remove it from the poll set and free the
2616 lttng_poll_del(&events
, stream
->wait_fd
);
2618 * This call update the channel states, closes file descriptors
2619 * and securely free the stream.
2621 consumer_del_metadata_stream(stream
, metadata_ht
);
2623 ERR("Unexpected poll events %u for sock %d", revents
, pollfd
);
2627 /* Release RCU lock for the stream looked up */
2635 DBG("Metadata poll thread exiting");
2637 lttng_poll_clean(&events
);
2642 ERR("Health error occurred in %s", __func__
);
2644 health_unregister(health_consumerd
);
2645 rcu_unregister_thread();
2650 * This thread polls the fds in the set to consume the data and write
2651 * it to tracefile if necessary.
2653 void *consumer_thread_data_poll(void *data
)
2655 int num_rdy
, num_hup
, high_prio
, ret
, i
, err
= -1;
2656 struct pollfd
*pollfd
= NULL
;
2657 /* local view of the streams */
2658 struct lttng_consumer_stream
**local_stream
= NULL
, *new_stream
= NULL
;
2659 /* local view of consumer_data.fds_count */
2661 /* 2 for the consumer_data_pipe and wake up pipe */
2662 const int nb_pipes_fd
= 2;
2663 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2664 int nb_inactive_fd
= 0;
2665 struct lttng_consumer_local_data
*ctx
= data
;
2668 rcu_register_thread();
2670 health_register(health_consumerd
, HEALTH_CONSUMERD_TYPE_DATA
);
2672 if (testpoint(consumerd_thread_data
)) {
2673 goto error_testpoint
;
2676 health_code_update();
2678 local_stream
= zmalloc(sizeof(struct lttng_consumer_stream
*));
2679 if (local_stream
== NULL
) {
2680 PERROR("local_stream malloc");
2685 health_code_update();
2691 * the fds set has been updated, we need to update our
2692 * local array as well
2694 pthread_mutex_lock(&consumer_data
.lock
);
2695 if (consumer_data
.need_update
) {
2700 local_stream
= NULL
;
2702 /* Allocate for all fds */
2703 pollfd
= zmalloc((consumer_data
.stream_count
+ nb_pipes_fd
) * sizeof(struct pollfd
));
2704 if (pollfd
== NULL
) {
2705 PERROR("pollfd malloc");
2706 pthread_mutex_unlock(&consumer_data
.lock
);
2710 local_stream
= zmalloc((consumer_data
.stream_count
+ nb_pipes_fd
) *
2711 sizeof(struct lttng_consumer_stream
*));
2712 if (local_stream
== NULL
) {
2713 PERROR("local_stream malloc");
2714 pthread_mutex_unlock(&consumer_data
.lock
);
2717 ret
= update_poll_array(ctx
, &pollfd
, local_stream
,
2718 data_ht
, &nb_inactive_fd
);
2720 ERR("Error in allocating pollfd or local_outfds");
2721 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_POLL_ERROR
);
2722 pthread_mutex_unlock(&consumer_data
.lock
);
2726 consumer_data
.need_update
= 0;
2728 pthread_mutex_unlock(&consumer_data
.lock
);
2730 /* No FDs and consumer_quit, consumer_cleanup the thread */
2731 if (nb_fd
== 0 && nb_inactive_fd
== 0 &&
2732 CMM_LOAD_SHARED(consumer_quit
) == 1) {
2733 err
= 0; /* All is OK */
2736 /* poll on the array of fds */
2738 DBG("polling on %d fd", nb_fd
+ nb_pipes_fd
);
2739 if (testpoint(consumerd_thread_data_poll
)) {
2742 health_poll_entry();
2743 num_rdy
= poll(pollfd
, nb_fd
+ nb_pipes_fd
, -1);
2745 DBG("poll num_rdy : %d", num_rdy
);
2746 if (num_rdy
== -1) {
2748 * Restart interrupted system call.
2750 if (errno
== EINTR
) {
2753 PERROR("Poll error");
2754 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_POLL_ERROR
);
2756 } else if (num_rdy
== 0) {
2757 DBG("Polling thread timed out");
2761 if (caa_unlikely(data_consumption_paused
)) {
2762 DBG("Data consumption paused, sleeping...");
2768 * If the consumer_data_pipe triggered poll go directly to the
2769 * beginning of the loop to update the array. We want to prioritize
2770 * array update over low-priority reads.
2772 if (pollfd
[nb_fd
].revents
& (POLLIN
| POLLPRI
)) {
2773 ssize_t pipe_readlen
;
2775 DBG("consumer_data_pipe wake up");
2776 pipe_readlen
= lttng_pipe_read(ctx
->consumer_data_pipe
,
2777 &new_stream
, sizeof(new_stream
));
2778 if (pipe_readlen
< sizeof(new_stream
)) {
2779 PERROR("Consumer data pipe");
2780 /* Continue so we can at least handle the current stream(s). */
2785 * If the stream is NULL, just ignore it. It's also possible that
2786 * the sessiond poll thread changed the consumer_quit state and is
2787 * waking us up to test it.
2789 if (new_stream
== NULL
) {
2790 validate_endpoint_status_data_stream();
2794 /* Continue to update the local streams and handle prio ones */
2798 /* Handle wakeup pipe. */
2799 if (pollfd
[nb_fd
+ 1].revents
& (POLLIN
| POLLPRI
)) {
2801 ssize_t pipe_readlen
;
2803 pipe_readlen
= lttng_pipe_read(ctx
->consumer_wakeup_pipe
, &dummy
,
2805 if (pipe_readlen
< 0) {
2806 PERROR("Consumer data wakeup pipe");
2808 /* We've been awakened to handle stream(s). */
2809 ctx
->has_wakeup
= 0;
2812 /* Take care of high priority channels first. */
2813 for (i
= 0; i
< nb_fd
; i
++) {
2814 health_code_update();
2816 if (local_stream
[i
] == NULL
) {
2819 if (pollfd
[i
].revents
& POLLPRI
) {
2820 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
2822 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
2823 /* it's ok to have an unavailable sub-buffer */
2824 if (len
< 0 && len
!= -EAGAIN
&& len
!= -ENODATA
) {
2825 /* Clean the stream and free it. */
2826 consumer_del_stream(local_stream
[i
], data_ht
);
2827 local_stream
[i
] = NULL
;
2828 } else if (len
> 0) {
2829 local_stream
[i
]->data_read
= 1;
2835 * If we read high prio channel in this loop, try again
2836 * for more high prio data.
2842 /* Take care of low priority channels. */
2843 for (i
= 0; i
< nb_fd
; i
++) {
2844 health_code_update();
2846 if (local_stream
[i
] == NULL
) {
2849 if ((pollfd
[i
].revents
& POLLIN
) ||
2850 local_stream
[i
]->hangup_flush_done
||
2851 local_stream
[i
]->has_data
) {
2852 DBG("Normal read on fd %d", pollfd
[i
].fd
);
2853 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
2854 /* it's ok to have an unavailable sub-buffer */
2855 if (len
< 0 && len
!= -EAGAIN
&& len
!= -ENODATA
) {
2856 /* Clean the stream and free it. */
2857 consumer_del_stream(local_stream
[i
], data_ht
);
2858 local_stream
[i
] = NULL
;
2859 } else if (len
> 0) {
2860 local_stream
[i
]->data_read
= 1;
2865 /* Handle hangup and errors */
2866 for (i
= 0; i
< nb_fd
; i
++) {
2867 health_code_update();
2869 if (local_stream
[i
] == NULL
) {
2872 if (!local_stream
[i
]->hangup_flush_done
2873 && (pollfd
[i
].revents
& (POLLHUP
| POLLERR
| POLLNVAL
))
2874 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
2875 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
2876 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2878 lttng_ustconsumer_on_stream_hangup(local_stream
[i
]);
2879 /* Attempt read again, for the data we just flushed. */
2880 local_stream
[i
]->data_read
= 1;
2883 * If the poll flag is HUP/ERR/NVAL and we have
2884 * read no data in this pass, we can remove the
2885 * stream from its hash table.
2887 if ((pollfd
[i
].revents
& POLLHUP
)) {
2888 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
2889 if (!local_stream
[i
]->data_read
) {
2890 consumer_del_stream(local_stream
[i
], data_ht
);
2891 local_stream
[i
] = NULL
;
2894 } else if (pollfd
[i
].revents
& POLLERR
) {
2895 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
2896 if (!local_stream
[i
]->data_read
) {
2897 consumer_del_stream(local_stream
[i
], data_ht
);
2898 local_stream
[i
] = NULL
;
2901 } else if (pollfd
[i
].revents
& POLLNVAL
) {
2902 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
2903 if (!local_stream
[i
]->data_read
) {
2904 consumer_del_stream(local_stream
[i
], data_ht
);
2905 local_stream
[i
] = NULL
;
2909 if (local_stream
[i
] != NULL
) {
2910 local_stream
[i
]->data_read
= 0;
2917 DBG("polling thread exiting");
2922 * Close the write side of the pipe so epoll_wait() in
2923 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2924 * read side of the pipe. If we close them both, epoll_wait strangely does
2925 * not return and could create a endless wait period if the pipe is the
2926 * only tracked fd in the poll set. The thread will take care of closing
2929 (void) lttng_pipe_write_close(ctx
->consumer_metadata_pipe
);
2934 ERR("Health error occurred in %s", __func__
);
2936 health_unregister(health_consumerd
);
2938 rcu_unregister_thread();
2943 * Close wake-up end of each stream belonging to the channel. This will
2944 * allow the poll() on the stream read-side to detect when the
2945 * write-side (application) finally closes them.
2948 void consumer_close_channel_streams(struct lttng_consumer_channel
*channel
)
2950 struct lttng_ht
*ht
;
2951 struct lttng_consumer_stream
*stream
;
2952 struct lttng_ht_iter iter
;
2954 ht
= consumer_data
.stream_per_chan_id_ht
;
2957 cds_lfht_for_each_entry_duplicate(ht
->ht
,
2958 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
2959 ht
->match_fct
, &channel
->key
,
2960 &iter
.iter
, stream
, node_channel_id
.node
) {
2962 * Protect against teardown with mutex.
2964 pthread_mutex_lock(&stream
->lock
);
2965 if (cds_lfht_is_node_deleted(&stream
->node
.node
)) {
2968 switch (consumer_data
.type
) {
2969 case LTTNG_CONSUMER_KERNEL
:
2971 case LTTNG_CONSUMER32_UST
:
2972 case LTTNG_CONSUMER64_UST
:
2973 if (stream
->metadata_flag
) {
2974 /* Safe and protected by the stream lock. */
2975 lttng_ustconsumer_close_metadata(stream
->chan
);
2978 * Note: a mutex is taken internally within
2979 * liblttng-ust-ctl to protect timer wakeup_fd
2980 * use from concurrent close.
2982 lttng_ustconsumer_close_stream_wakeup(stream
);
2986 ERR("Unknown consumer_data type");
2990 pthread_mutex_unlock(&stream
->lock
);
2995 static void destroy_channel_ht(struct lttng_ht
*ht
)
2997 struct lttng_ht_iter iter
;
2998 struct lttng_consumer_channel
*channel
;
3006 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, channel
, wait_fd_node
.node
) {
3007 ret
= lttng_ht_del(ht
, &iter
);
3012 lttng_ht_destroy(ht
);
3016 * This thread polls the channel fds to detect when they are being
3017 * closed. It closes all related streams if the channel is detected as
3018 * closed. It is currently only used as a shim layer for UST because the
3019 * consumerd needs to keep the per-stream wakeup end of pipes open for
3022 void *consumer_thread_channel_poll(void *data
)
3024 int ret
, i
, pollfd
, err
= -1;
3025 uint32_t revents
, nb_fd
;
3026 struct lttng_consumer_channel
*chan
= NULL
;
3027 struct lttng_ht_iter iter
;
3028 struct lttng_ht_node_u64
*node
;
3029 struct lttng_poll_event events
;
3030 struct lttng_consumer_local_data
*ctx
= data
;
3031 struct lttng_ht
*channel_ht
;
3033 rcu_register_thread();
3035 health_register(health_consumerd
, HEALTH_CONSUMERD_TYPE_CHANNEL
);
3037 if (testpoint(consumerd_thread_channel
)) {
3038 goto error_testpoint
;
3041 health_code_update();
3043 channel_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
3045 /* ENOMEM at this point. Better to bail out. */
3049 DBG("Thread channel poll started");
3051 /* Size is set to 1 for the consumer_channel pipe */
3052 ret
= lttng_poll_create(&events
, 2, LTTNG_CLOEXEC
);
3054 ERR("Poll set creation failed");
3058 ret
= lttng_poll_add(&events
, ctx
->consumer_channel_pipe
[0], LPOLLIN
);
3064 DBG("Channel main loop started");
3068 health_code_update();
3069 DBG("Channel poll wait");
3070 health_poll_entry();
3071 ret
= lttng_poll_wait(&events
, -1);
3072 DBG("Channel poll return from wait with %d fd(s)",
3073 LTTNG_POLL_GETNB(&events
));
3075 DBG("Channel event caught in thread");
3077 if (errno
== EINTR
) {
3078 ERR("Poll EINTR caught");
3081 if (LTTNG_POLL_GETNB(&events
) == 0) {
3082 err
= 0; /* All is OK */
3089 /* From here, the event is a channel wait fd */
3090 for (i
= 0; i
< nb_fd
; i
++) {
3091 health_code_update();
3093 revents
= LTTNG_POLL_GETEV(&events
, i
);
3094 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
3096 if (pollfd
== ctx
->consumer_channel_pipe
[0]) {
3097 if (revents
& LPOLLIN
) {
3098 enum consumer_channel_action action
;
3101 ret
= read_channel_pipe(ctx
, &chan
, &key
, &action
);
3104 ERR("Error reading channel pipe");
3106 lttng_poll_del(&events
, ctx
->consumer_channel_pipe
[0]);
3111 case CONSUMER_CHANNEL_ADD
:
3112 DBG("Adding channel %d to poll set",
3115 lttng_ht_node_init_u64(&chan
->wait_fd_node
,
3118 lttng_ht_add_unique_u64(channel_ht
,
3119 &chan
->wait_fd_node
);
3121 /* Add channel to the global poll events list */
3122 lttng_poll_add(&events
, chan
->wait_fd
,
3123 LPOLLERR
| LPOLLHUP
);
3125 case CONSUMER_CHANNEL_DEL
:
3128 * This command should never be called if the channel
3129 * has streams monitored by either the data or metadata
3130 * thread. The consumer only notify this thread with a
3131 * channel del. command if it receives a destroy
3132 * channel command from the session daemon that send it
3133 * if a command prior to the GET_CHANNEL failed.
3137 chan
= consumer_find_channel(key
);
3140 ERR("UST consumer get channel key %" PRIu64
" not found for del channel", key
);
3143 lttng_poll_del(&events
, chan
->wait_fd
);
3144 iter
.iter
.node
= &chan
->wait_fd_node
.node
;
3145 ret
= lttng_ht_del(channel_ht
, &iter
);
3148 switch (consumer_data
.type
) {
3149 case LTTNG_CONSUMER_KERNEL
:
3151 case LTTNG_CONSUMER32_UST
:
3152 case LTTNG_CONSUMER64_UST
:
3153 health_code_update();
3154 /* Destroy streams that might have been left in the stream list. */
3155 clean_channel_stream_list(chan
);
3158 ERR("Unknown consumer_data type");
3163 * Release our own refcount. Force channel deletion even if
3164 * streams were not initialized.
3166 if (!uatomic_sub_return(&chan
->refcount
, 1)) {
3167 consumer_del_channel(chan
);
3172 case CONSUMER_CHANNEL_QUIT
:
3174 * Remove the pipe from the poll set and continue the loop
3175 * since their might be data to consume.
3177 lttng_poll_del(&events
, ctx
->consumer_channel_pipe
[0]);
3180 ERR("Unknown action");
3183 } else if (revents
& (LPOLLERR
| LPOLLHUP
)) {
3184 DBG("Channel thread pipe hung up");
3186 * Remove the pipe from the poll set and continue the loop
3187 * since their might be data to consume.
3189 lttng_poll_del(&events
, ctx
->consumer_channel_pipe
[0]);
3192 ERR("Unexpected poll events %u for sock %d", revents
, pollfd
);
3196 /* Handle other stream */
3202 uint64_t tmp_id
= (uint64_t) pollfd
;
3204 lttng_ht_lookup(channel_ht
, &tmp_id
, &iter
);
3206 node
= lttng_ht_iter_get_node_u64(&iter
);
3209 chan
= caa_container_of(node
, struct lttng_consumer_channel
,
3212 /* Check for error event */
3213 if (revents
& (LPOLLERR
| LPOLLHUP
)) {
3214 DBG("Channel fd %d is hup|err.", pollfd
);
3216 lttng_poll_del(&events
, chan
->wait_fd
);
3217 ret
= lttng_ht_del(channel_ht
, &iter
);
3221 * This will close the wait fd for each stream associated to
3222 * this channel AND monitored by the data/metadata thread thus
3223 * will be clean by the right thread.
3225 consumer_close_channel_streams(chan
);
3227 /* Release our own refcount */
3228 if (!uatomic_sub_return(&chan
->refcount
, 1)
3229 && !uatomic_read(&chan
->nb_init_stream_left
)) {
3230 consumer_del_channel(chan
);
3233 ERR("Unexpected poll events %u for sock %d", revents
, pollfd
);
3238 /* Release RCU lock for the channel looked up */
3246 lttng_poll_clean(&events
);
3248 destroy_channel_ht(channel_ht
);
3251 DBG("Channel poll thread exiting");
3254 ERR("Health error occurred in %s", __func__
);
3256 health_unregister(health_consumerd
);
3257 rcu_unregister_thread();
3261 static int set_metadata_socket(struct lttng_consumer_local_data
*ctx
,
3262 struct pollfd
*sockpoll
, int client_socket
)
3269 ret
= lttng_consumer_poll_socket(sockpoll
);
3273 DBG("Metadata connection on client_socket");
3275 /* Blocking call, waiting for transmission */
3276 ctx
->consumer_metadata_socket
= lttcomm_accept_unix_sock(client_socket
);
3277 if (ctx
->consumer_metadata_socket
< 0) {
3278 WARN("On accept metadata");
3289 * This thread listens on the consumerd socket and receives the file
3290 * descriptors from the session daemon.
3292 void *consumer_thread_sessiond_poll(void *data
)
3294 int sock
= -1, client_socket
, ret
, err
= -1;
3296 * structure to poll for incoming data on communication socket avoids
3297 * making blocking sockets.
3299 struct pollfd consumer_sockpoll
[2];
3300 struct lttng_consumer_local_data
*ctx
= data
;
3302 rcu_register_thread();
3304 health_register(health_consumerd
, HEALTH_CONSUMERD_TYPE_SESSIOND
);
3306 if (testpoint(consumerd_thread_sessiond
)) {
3307 goto error_testpoint
;
3310 health_code_update();
3312 DBG("Creating command socket %s", ctx
->consumer_command_sock_path
);
3313 unlink(ctx
->consumer_command_sock_path
);
3314 client_socket
= lttcomm_create_unix_sock(ctx
->consumer_command_sock_path
);
3315 if (client_socket
< 0) {
3316 ERR("Cannot create command socket");
3320 ret
= lttcomm_listen_unix_sock(client_socket
);
3325 DBG("Sending ready command to lttng-sessiond");
3326 ret
= lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY
);
3327 /* return < 0 on error, but == 0 is not fatal */
3329 ERR("Error sending ready command to lttng-sessiond");
3333 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3334 consumer_sockpoll
[0].fd
= ctx
->consumer_should_quit
[0];
3335 consumer_sockpoll
[0].events
= POLLIN
| POLLPRI
;
3336 consumer_sockpoll
[1].fd
= client_socket
;
3337 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
3339 ret
= lttng_consumer_poll_socket(consumer_sockpoll
);
3347 DBG("Connection on client_socket");
3349 /* Blocking call, waiting for transmission */
3350 sock
= lttcomm_accept_unix_sock(client_socket
);
3357 * Setup metadata socket which is the second socket connection on the
3358 * command unix socket.
3360 ret
= set_metadata_socket(ctx
, consumer_sockpoll
, client_socket
);
3369 /* This socket is not useful anymore. */
3370 ret
= close(client_socket
);
3372 PERROR("close client_socket");
3376 /* update the polling structure to poll on the established socket */
3377 consumer_sockpoll
[1].fd
= sock
;
3378 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
3381 health_code_update();
3383 health_poll_entry();
3384 ret
= lttng_consumer_poll_socket(consumer_sockpoll
);
3393 DBG("Incoming command on sock");
3394 ret
= lttng_consumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
3397 * This could simply be a session daemon quitting. Don't output
3400 DBG("Communication interrupted on command socket");
3404 if (CMM_LOAD_SHARED(consumer_quit
)) {
3405 DBG("consumer_thread_receive_fds received quit from signal");
3406 err
= 0; /* All is OK */
3409 DBG("received command on sock");
3415 DBG("Consumer thread sessiond poll exiting");
3418 * Close metadata streams since the producer is the session daemon which
3421 * NOTE: for now, this only applies to the UST tracer.
3423 lttng_consumer_close_all_metadata();
3426 * when all fds have hung up, the polling thread
3429 CMM_STORE_SHARED(consumer_quit
, 1);
3432 * Notify the data poll thread to poll back again and test the
3433 * consumer_quit state that we just set so to quit gracefully.
3435 notify_thread_lttng_pipe(ctx
->consumer_data_pipe
);
3437 notify_channel_pipe(ctx
, NULL
, -1, CONSUMER_CHANNEL_QUIT
);
3439 notify_health_quit_pipe(health_quit_pipe
);
3441 /* Cleaning up possibly open sockets. */
3445 PERROR("close sock sessiond poll");
3448 if (client_socket
>= 0) {
3449 ret
= close(client_socket
);
3451 PERROR("close client_socket sessiond poll");
3458 ERR("Health error occurred in %s", __func__
);
3460 health_unregister(health_consumerd
);
3462 rcu_unregister_thread();
3466 ssize_t
lttng_consumer_read_subbuffer(struct lttng_consumer_stream
*stream
,
3467 struct lttng_consumer_local_data
*ctx
)
3471 pthread_mutex_lock(&stream
->chan
->lock
);
3472 pthread_mutex_lock(&stream
->lock
);
3473 if (stream
->metadata_flag
) {
3474 pthread_mutex_lock(&stream
->metadata_rdv_lock
);
3477 switch (consumer_data
.type
) {
3478 case LTTNG_CONSUMER_KERNEL
:
3479 ret
= lttng_kconsumer_read_subbuffer(stream
, ctx
);
3481 case LTTNG_CONSUMER32_UST
:
3482 case LTTNG_CONSUMER64_UST
:
3483 ret
= lttng_ustconsumer_read_subbuffer(stream
, ctx
);
3486 ERR("Unknown consumer_data type");
3492 if (stream
->metadata_flag
) {
3493 pthread_cond_broadcast(&stream
->metadata_rdv
);
3494 pthread_mutex_unlock(&stream
->metadata_rdv_lock
);
3496 pthread_mutex_unlock(&stream
->lock
);
3497 pthread_mutex_unlock(&stream
->chan
->lock
);
3502 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream
*stream
)
3504 switch (consumer_data
.type
) {
3505 case LTTNG_CONSUMER_KERNEL
:
3506 return lttng_kconsumer_on_recv_stream(stream
);
3507 case LTTNG_CONSUMER32_UST
:
3508 case LTTNG_CONSUMER64_UST
:
3509 return lttng_ustconsumer_on_recv_stream(stream
);
3511 ERR("Unknown consumer_data type");
3518 * Allocate and set consumer data hash tables.
3520 int lttng_consumer_init(void)
3522 consumer_data
.channel_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
3523 if (!consumer_data
.channel_ht
) {
3527 consumer_data
.channels_by_session_id_ht
=
3528 lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
3529 if (!consumer_data
.channels_by_session_id_ht
) {
3533 consumer_data
.relayd_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
3534 if (!consumer_data
.relayd_ht
) {
3538 consumer_data
.stream_list_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
3539 if (!consumer_data
.stream_list_ht
) {
3543 consumer_data
.stream_per_chan_id_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
3544 if (!consumer_data
.stream_per_chan_id_ht
) {
3548 data_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
3553 metadata_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
3558 consumer_data
.chunk_registry
= lttng_trace_chunk_registry_create();
3559 if (!consumer_data
.chunk_registry
) {
3570 * Process the ADD_RELAYD command receive by a consumer.
3572 * This will create a relayd socket pair and add it to the relayd hash table.
3573 * The caller MUST acquire a RCU read side lock before calling it.
3575 void consumer_add_relayd_socket(uint64_t net_seq_idx
, int sock_type
,
3576 struct lttng_consumer_local_data
*ctx
, int sock
,
3577 struct pollfd
*consumer_sockpoll
,
3578 struct lttcomm_relayd_sock
*relayd_sock
, uint64_t sessiond_id
,
3579 uint64_t relayd_session_id
)
3581 int fd
= -1, ret
= -1, relayd_created
= 0;
3582 enum lttcomm_return_code ret_code
= LTTCOMM_CONSUMERD_SUCCESS
;
3583 struct consumer_relayd_sock_pair
*relayd
= NULL
;
3586 assert(relayd_sock
);
3588 DBG("Consumer adding relayd socket (idx: %" PRIu64
")", net_seq_idx
);
3590 /* Get relayd reference if exists. */
3591 relayd
= consumer_find_relayd(net_seq_idx
);
3592 if (relayd
== NULL
) {
3593 assert(sock_type
== LTTNG_STREAM_CONTROL
);
3594 /* Not found. Allocate one. */
3595 relayd
= consumer_allocate_relayd_sock_pair(net_seq_idx
);
3596 if (relayd
== NULL
) {
3597 ret_code
= LTTCOMM_CONSUMERD_ENOMEM
;
3600 relayd
->sessiond_session_id
= sessiond_id
;
3605 * This code path MUST continue to the consumer send status message to
3606 * we can notify the session daemon and continue our work without
3607 * killing everything.
3611 * relayd key should never be found for control socket.
3613 assert(sock_type
!= LTTNG_STREAM_CONTROL
);
3616 /* First send a status message before receiving the fds. */
3617 ret
= consumer_send_status_msg(sock
, LTTCOMM_CONSUMERD_SUCCESS
);
3619 /* Somehow, the session daemon is not responding anymore. */
3620 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_FATAL
);
3621 goto error_nosignal
;
3624 /* Poll on consumer socket. */
3625 ret
= lttng_consumer_poll_socket(consumer_sockpoll
);
3627 /* Needing to exit in the middle of a command: error. */
3628 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_POLL_ERROR
);
3629 goto error_nosignal
;
3632 /* Get relayd socket from session daemon */
3633 ret
= lttcomm_recv_fds_unix_sock(sock
, &fd
, 1);
3634 if (ret
!= sizeof(fd
)) {
3635 fd
= -1; /* Just in case it gets set with an invalid value. */
3638 * Failing to receive FDs might indicate a major problem such as
3639 * reaching a fd limit during the receive where the kernel returns a
3640 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3641 * don't take any chances and stop everything.
3643 * XXX: Feature request #558 will fix that and avoid this possible
3644 * issue when reaching the fd limit.
3646 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_ERROR_RECV_FD
);
3647 ret_code
= LTTCOMM_CONSUMERD_ERROR_RECV_FD
;
3651 /* Copy socket information and received FD */
3652 switch (sock_type
) {
3653 case LTTNG_STREAM_CONTROL
:
3654 /* Copy received lttcomm socket */
3655 lttcomm_copy_sock(&relayd
->control_sock
.sock
, &relayd_sock
->sock
);
3656 ret
= lttcomm_create_sock(&relayd
->control_sock
.sock
);
3657 /* Handle create_sock error. */
3659 ret_code
= LTTCOMM_CONSUMERD_ENOMEM
;
3663 * Close the socket created internally by
3664 * lttcomm_create_sock, so we can replace it by the one
3665 * received from sessiond.
3667 if (close(relayd
->control_sock
.sock
.fd
)) {
3671 /* Assign new file descriptor */
3672 relayd
->control_sock
.sock
.fd
= fd
;
3673 /* Assign version values. */
3674 relayd
->control_sock
.major
= relayd_sock
->major
;
3675 relayd
->control_sock
.minor
= relayd_sock
->minor
;
3677 relayd
->relayd_session_id
= relayd_session_id
;
3680 case LTTNG_STREAM_DATA
:
3681 /* Copy received lttcomm socket */
3682 lttcomm_copy_sock(&relayd
->data_sock
.sock
, &relayd_sock
->sock
);
3683 ret
= lttcomm_create_sock(&relayd
->data_sock
.sock
);
3684 /* Handle create_sock error. */
3686 ret_code
= LTTCOMM_CONSUMERD_ENOMEM
;
3690 * Close the socket created internally by
3691 * lttcomm_create_sock, so we can replace it by the one
3692 * received from sessiond.
3694 if (close(relayd
->data_sock
.sock
.fd
)) {
3698 /* Assign new file descriptor */
3699 relayd
->data_sock
.sock
.fd
= fd
;
3700 /* Assign version values. */
3701 relayd
->data_sock
.major
= relayd_sock
->major
;
3702 relayd
->data_sock
.minor
= relayd_sock
->minor
;
3705 ERR("Unknown relayd socket type (%d)", sock_type
);
3706 ret_code
= LTTCOMM_CONSUMERD_FATAL
;
3710 DBG("Consumer %s socket created successfully with net idx %" PRIu64
" (fd: %d)",
3711 sock_type
== LTTNG_STREAM_CONTROL
? "control" : "data",
3712 relayd
->net_seq_idx
, fd
);
3714 * We gave the ownership of the fd to the relayd structure. Set the
3715 * fd to -1 so we don't call close() on it in the error path below.
3719 /* We successfully added the socket. Send status back. */
3720 ret
= consumer_send_status_msg(sock
, ret_code
);
3722 /* Somehow, the session daemon is not responding anymore. */
3723 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_FATAL
);
3724 goto error_nosignal
;
3728 * Add relayd socket pair to consumer data hashtable. If object already
3729 * exists or on error, the function gracefully returns.
3738 if (consumer_send_status_msg(sock
, ret_code
) < 0) {
3739 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_FATAL
);
3743 /* Close received socket if valid. */
3746 PERROR("close received socket");
3750 if (relayd_created
) {
3756 * Search for a relayd associated to the session id and return the reference.
3758 * A rcu read side lock MUST be acquire before calling this function and locked
3759 * until the relayd object is no longer necessary.
3761 static struct consumer_relayd_sock_pair
*find_relayd_by_session_id(uint64_t id
)
3763 struct lttng_ht_iter iter
;
3764 struct consumer_relayd_sock_pair
*relayd
= NULL
;
3766 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3767 cds_lfht_for_each_entry(consumer_data
.relayd_ht
->ht
, &iter
.iter
, relayd
,
3770 * Check by sessiond id which is unique here where the relayd session
3771 * id might not be when having multiple relayd.
3773 if (relayd
->sessiond_session_id
== id
) {
3774 /* Found the relayd. There can be only one per id. */
3786 * Check if for a given session id there is still data needed to be extract
3789 * Return 1 if data is pending or else 0 meaning ready to be read.
3791 int consumer_data_pending(uint64_t id
)
3794 struct lttng_ht_iter iter
;
3795 struct lttng_ht
*ht
;
3796 struct lttng_consumer_stream
*stream
;
3797 struct consumer_relayd_sock_pair
*relayd
= NULL
;
3798 int (*data_pending
)(struct lttng_consumer_stream
*);
3800 DBG("Consumer data pending command on session id %" PRIu64
, id
);
3803 pthread_mutex_lock(&consumer_data
.lock
);
3805 switch (consumer_data
.type
) {
3806 case LTTNG_CONSUMER_KERNEL
:
3807 data_pending
= lttng_kconsumer_data_pending
;
3809 case LTTNG_CONSUMER32_UST
:
3810 case LTTNG_CONSUMER64_UST
:
3811 data_pending
= lttng_ustconsumer_data_pending
;
3814 ERR("Unknown consumer data type");
3818 /* Ease our life a bit */
3819 ht
= consumer_data
.stream_list_ht
;
3821 cds_lfht_for_each_entry_duplicate(ht
->ht
,
3822 ht
->hash_fct(&id
, lttng_ht_seed
),
3824 &iter
.iter
, stream
, node_session_id
.node
) {
3825 pthread_mutex_lock(&stream
->lock
);
3828 * A removed node from the hash table indicates that the stream has
3829 * been deleted thus having a guarantee that the buffers are closed
3830 * on the consumer side. However, data can still be transmitted
3831 * over the network so don't skip the relayd check.
3833 ret
= cds_lfht_is_node_deleted(&stream
->node
.node
);
3835 /* Check the stream if there is data in the buffers. */
3836 ret
= data_pending(stream
);
3838 pthread_mutex_unlock(&stream
->lock
);
3843 pthread_mutex_unlock(&stream
->lock
);
3846 relayd
= find_relayd_by_session_id(id
);
3848 unsigned int is_data_inflight
= 0;
3850 /* Send init command for data pending. */
3851 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
3852 ret
= relayd_begin_data_pending(&relayd
->control_sock
,
3853 relayd
->relayd_session_id
);
3855 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
3856 /* Communication error thus the relayd so no data pending. */
3857 goto data_not_pending
;
3860 cds_lfht_for_each_entry_duplicate(ht
->ht
,
3861 ht
->hash_fct(&id
, lttng_ht_seed
),
3863 &iter
.iter
, stream
, node_session_id
.node
) {
3864 if (stream
->metadata_flag
) {
3865 ret
= relayd_quiescent_control(&relayd
->control_sock
,
3866 stream
->relayd_stream_id
);
3868 ret
= relayd_data_pending(&relayd
->control_sock
,
3869 stream
->relayd_stream_id
,
3870 stream
->next_net_seq_num
- 1);
3874 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
3876 } else if (ret
< 0) {
3877 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64
".", relayd
->net_seq_idx
);
3878 lttng_consumer_cleanup_relayd(relayd
);
3879 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
3880 goto data_not_pending
;
3884 /* Send end command for data pending. */
3885 ret
= relayd_end_data_pending(&relayd
->control_sock
,
3886 relayd
->relayd_session_id
, &is_data_inflight
);
3887 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
3889 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64
".", relayd
->net_seq_idx
);
3890 lttng_consumer_cleanup_relayd(relayd
);
3891 goto data_not_pending
;
3893 if (is_data_inflight
) {
3899 * Finding _no_ node in the hash table and no inflight data means that the
3900 * stream(s) have been removed thus data is guaranteed to be available for
3901 * analysis from the trace files.
3905 /* Data is available to be read by a viewer. */
3906 pthread_mutex_unlock(&consumer_data
.lock
);
3911 /* Data is still being extracted from buffers. */
3912 pthread_mutex_unlock(&consumer_data
.lock
);
3918 * Send a ret code status message to the sessiond daemon.
3920 * Return the sendmsg() return value.
3922 int consumer_send_status_msg(int sock
, int ret_code
)
3924 struct lttcomm_consumer_status_msg msg
;
3926 memset(&msg
, 0, sizeof(msg
));
3927 msg
.ret_code
= ret_code
;
3929 return lttcomm_send_unix_sock(sock
, &msg
, sizeof(msg
));
3933 * Send a channel status message to the sessiond daemon.
3935 * Return the sendmsg() return value.
3937 int consumer_send_status_channel(int sock
,
3938 struct lttng_consumer_channel
*channel
)
3940 struct lttcomm_consumer_status_channel msg
;
3944 memset(&msg
, 0, sizeof(msg
));
3946 msg
.ret_code
= LTTCOMM_CONSUMERD_CHANNEL_FAIL
;
3948 msg
.ret_code
= LTTCOMM_CONSUMERD_SUCCESS
;
3949 msg
.key
= channel
->key
;
3950 msg
.stream_count
= channel
->streams
.count
;
3953 return lttcomm_send_unix_sock(sock
, &msg
, sizeof(msg
));
3956 unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos
,
3957 unsigned long produced_pos
, uint64_t nb_packets_per_stream
,
3958 uint64_t max_sb_size
)
3960 unsigned long start_pos
;
3962 if (!nb_packets_per_stream
) {
3963 return consumed_pos
; /* Grab everything */
3965 start_pos
= produced_pos
- offset_align_floor(produced_pos
, max_sb_size
);
3966 start_pos
-= max_sb_size
* nb_packets_per_stream
;
3967 if ((long) (start_pos
- consumed_pos
) < 0) {
3968 return consumed_pos
; /* Grab everything */
3974 int consumer_flush_buffer(struct lttng_consumer_stream
*stream
, int producer_active
)
3978 switch (consumer_data
.type
) {
3979 case LTTNG_CONSUMER_KERNEL
:
3980 ret
= kernctl_buffer_flush(stream
->wait_fd
);
3982 ERR("Failed to flush kernel stream");
3986 case LTTNG_CONSUMER32_UST
:
3987 case LTTNG_CONSUMER64_UST
:
3988 lttng_ustctl_flush_buffer(stream
, producer_active
);
3991 ERR("Unknown consumer_data type");
4000 * Sample the rotate position for all the streams of a channel. If a stream
4001 * is already at the rotate position (produced == consumed), we flag it as
4002 * ready for rotation. The rotation of ready streams occurs after we have
4003 * replied to the session daemon that we have finished sampling the positions.
4004 * Must be called with RCU read-side lock held to ensure existence of channel.
4006 * Returns 0 on success, < 0 on error
4008 int lttng_consumer_rotate_channel(struct lttng_consumer_channel
*channel
,
4009 uint64_t key
, uint64_t relayd_id
, uint32_t metadata
,
4010 struct lttng_consumer_local_data
*ctx
)
4013 struct lttng_consumer_stream
*stream
;
4014 struct lttng_ht_iter iter
;
4015 struct lttng_ht
*ht
= consumer_data
.stream_per_chan_id_ht
;
4016 struct lttng_dynamic_array stream_rotation_positions
;
4017 uint64_t next_chunk_id
, stream_count
= 0;
4018 enum lttng_trace_chunk_status chunk_status
;
4019 const bool is_local_trace
= relayd_id
== -1ULL;
4020 struct consumer_relayd_sock_pair
*relayd
= NULL
;
4021 bool rotating_to_new_chunk
= true;
4023 DBG("Consumer sample rotate position for channel %" PRIu64
, key
);
4025 lttng_dynamic_array_init(&stream_rotation_positions
,
4026 sizeof(struct relayd_stream_rotation_position
), NULL
);
4030 pthread_mutex_lock(&channel
->lock
);
4031 assert(channel
->trace_chunk
);
4032 chunk_status
= lttng_trace_chunk_get_id(channel
->trace_chunk
,
4034 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
4036 goto end_unlock_channel
;
4039 cds_lfht_for_each_entry_duplicate(ht
->ht
,
4040 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
4041 ht
->match_fct
, &channel
->key
, &iter
.iter
,
4042 stream
, node_channel_id
.node
) {
4043 unsigned long consumed_pos
;
4045 health_code_update();
4048 * Lock stream because we are about to change its state.
4050 pthread_mutex_lock(&stream
->lock
);
4052 if (stream
->trace_chunk
== stream
->chan
->trace_chunk
) {
4053 rotating_to_new_chunk
= false;
4056 ret
= lttng_consumer_sample_snapshot_positions(stream
);
4058 ERR("Failed to sample snapshot position during channel rotation");
4059 goto end_unlock_stream
;
4062 ret
= lttng_consumer_get_produced_snapshot(stream
,
4063 &stream
->rotate_position
);
4065 ERR("Failed to sample produced position during channel rotation");
4066 goto end_unlock_stream
;
4069 lttng_consumer_get_consumed_snapshot(stream
,
4071 if (consumed_pos
== stream
->rotate_position
) {
4072 stream
->rotate_ready
= true;
4076 * Active flush; has no effect if the production position
4077 * is at a packet boundary.
4079 ret
= consumer_flush_buffer(stream
, 1);
4081 ERR("Failed to flush stream %" PRIu64
" during channel rotation",
4083 goto end_unlock_stream
;
4086 if (!is_local_trace
) {
4088 * The relay daemon control protocol expects a rotation
4089 * position as "the sequence number of the first packet
4090 * _after_ the current trace chunk.
4092 * At the moment when the positions of the buffers are
4093 * sampled, the production position does not necessarily
4094 * sit at a packet boundary. The 'active' flush
4095 * operation above will push the production position to
4096 * the next packet boundary _if_ it is not already
4097 * sitting at such a boundary.
4099 * Assuming a current production position that is not
4100 * on the bound of a packet, the 'target' sequence
4102 * (consumed_pos / subbuffer_size) + 1
4103 * Note the '+ 1' to ensure the current packet is
4104 * part of the current trace chunk.
4106 * However, if the production position is already at
4107 * a packet boundary, the '+ 1' is not necessary as the
4108 * last packet of the current chunk is already
4111 const struct relayd_stream_rotation_position position
= {
4112 .stream_id
= stream
->relayd_stream_id
,
4113 .rotate_at_seq_num
= (stream
->rotate_position
/ stream
->max_sb_size
) +
4114 !!(stream
->rotate_position
% stream
->max_sb_size
),
4117 ret
= lttng_dynamic_array_add_element(
4118 &stream_rotation_positions
,
4121 ERR("Failed to allocate stream rotation position");
4122 goto end_unlock_stream
;
4126 pthread_mutex_unlock(&stream
->lock
);
4129 pthread_mutex_unlock(&channel
->lock
);
4131 if (is_local_trace
) {
4136 relayd
= consumer_find_relayd(relayd_id
);
4138 ERR("Failed to find relayd %" PRIu64
, relayd_id
);
4143 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
4144 ret
= relayd_rotate_streams(&relayd
->control_sock
, stream_count
,
4145 rotating_to_new_chunk
? &next_chunk_id
: NULL
,
4146 (const struct relayd_stream_rotation_position
*)
4147 stream_rotation_positions
.buffer
.data
);
4148 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
4150 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64
,
4151 relayd
->net_seq_idx
);
4152 lttng_consumer_cleanup_relayd(relayd
);
4160 pthread_mutex_unlock(&stream
->lock
);
4162 pthread_mutex_unlock(&channel
->lock
);
4165 lttng_dynamic_array_reset(&stream_rotation_positions
);
4170 * Check if a stream is ready to be rotated after extracting it.
4172 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4173 * error. Stream lock must be held.
4175 int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream
*stream
)
4178 unsigned long consumed_pos
;
4180 if (!stream
->rotate_position
&& !stream
->rotate_ready
) {
4185 if (stream
->rotate_ready
) {
4191 * If we don't have the rotate_ready flag, check the consumed position
4192 * to determine if we need to rotate.
4194 ret
= lttng_consumer_sample_snapshot_positions(stream
);
4196 ERR("Taking snapshot positions");
4200 ret
= lttng_consumer_get_consumed_snapshot(stream
, &consumed_pos
);
4202 ERR("Consumed snapshot position");
4206 /* Rotate position not reached yet (with check for overflow). */
4207 if ((long) (consumed_pos
- stream
->rotate_position
) < 0) {
4218 * Reset the state for a stream after a rotation occurred.
4220 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream
*stream
)
4222 stream
->rotate_position
= 0;
4223 stream
->rotate_ready
= false;
4227 * Perform the rotation a local stream file.
4230 int rotate_local_stream(struct lttng_consumer_local_data
*ctx
,
4231 struct lttng_consumer_stream
*stream
)
4235 DBG("Rotate local stream: stream key %" PRIu64
", channel key %" PRIu64
,
4238 stream
->tracefile_size_current
= 0;
4239 stream
->tracefile_count_current
= 0;
4241 if (stream
->out_fd
>= 0) {
4242 ret
= close(stream
->out_fd
);
4244 PERROR("Failed to close stream out_fd of channel \"%s\"",
4245 stream
->chan
->name
);
4247 stream
->out_fd
= -1;
4250 if (stream
->index_file
) {
4251 lttng_index_file_put(stream
->index_file
);
4252 stream
->index_file
= NULL
;
4255 if (!stream
->trace_chunk
) {
4259 ret
= consumer_stream_create_output_files(stream
, true);
4265 * Performs the stream rotation for the rotate session feature if needed.
4266 * It must be called with the channel and stream locks held.
4268 * Return 0 on success, a negative number of error.
4270 int lttng_consumer_rotate_stream(struct lttng_consumer_local_data
*ctx
,
4271 struct lttng_consumer_stream
*stream
)
4275 DBG("Consumer rotate stream %" PRIu64
, stream
->key
);
4278 * Update the stream's 'current' chunk to the session's (channel)
4279 * now-current chunk.
4281 lttng_trace_chunk_put(stream
->trace_chunk
);
4282 if (stream
->chan
->trace_chunk
== stream
->trace_chunk
) {
4284 * A channel can be rotated and not have a "next" chunk
4285 * to transition to. In that case, the channel's "current chunk"
4286 * has not been closed yet, but it has not been updated to
4287 * a "next" trace chunk either. Hence, the stream, like its
4288 * parent channel, becomes part of no chunk and can't output
4289 * anything until a new trace chunk is created.
4291 stream
->trace_chunk
= NULL
;
4292 } else if (stream
->chan
->trace_chunk
&&
4293 !lttng_trace_chunk_get(stream
->chan
->trace_chunk
)) {
4294 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4299 * Update the stream's trace chunk to its parent channel's
4300 * current trace chunk.
4302 stream
->trace_chunk
= stream
->chan
->trace_chunk
;
4305 if (stream
->net_seq_idx
== (uint64_t) -1ULL) {
4306 ret
= rotate_local_stream(ctx
, stream
);
4308 ERR("Failed to rotate stream, ret = %i", ret
);
4313 if (stream
->metadata_flag
&& stream
->trace_chunk
) {
4315 * If the stream has transitioned to a new trace
4316 * chunk, the metadata should be re-dumped to the
4319 * However, it is possible for a stream to transition to
4320 * a "no-chunk" state. This can happen if a rotation
4321 * occurs on an inactive session. In such cases, the metadata
4322 * regeneration will happen when the next trace chunk is
4325 ret
= consumer_metadata_stream_dump(stream
);
4330 lttng_consumer_reset_stream_rotate_state(stream
);
4339 * Rotate all the ready streams now.
4341 * This is especially important for low throughput streams that have already
4342 * been consumed, we cannot wait for their next packet to perform the
4344 * Need to be called with RCU read-side lock held to ensure existence of
4347 * Returns 0 on success, < 0 on error
4349 int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel
*channel
,
4350 uint64_t key
, struct lttng_consumer_local_data
*ctx
)
4353 struct lttng_consumer_stream
*stream
;
4354 struct lttng_ht_iter iter
;
4355 struct lttng_ht
*ht
= consumer_data
.stream_per_chan_id_ht
;
4359 DBG("Consumer rotate ready streams in channel %" PRIu64
, key
);
4361 cds_lfht_for_each_entry_duplicate(ht
->ht
,
4362 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
4363 ht
->match_fct
, &channel
->key
, &iter
.iter
,
4364 stream
, node_channel_id
.node
) {
4365 health_code_update();
4367 pthread_mutex_lock(&stream
->chan
->lock
);
4368 pthread_mutex_lock(&stream
->lock
);
4370 if (!stream
->rotate_ready
) {
4371 pthread_mutex_unlock(&stream
->lock
);
4372 pthread_mutex_unlock(&stream
->chan
->lock
);
4375 DBG("Consumer rotate ready stream %" PRIu64
, stream
->key
);
4377 ret
= lttng_consumer_rotate_stream(ctx
, stream
);
4378 pthread_mutex_unlock(&stream
->lock
);
4379 pthread_mutex_unlock(&stream
->chan
->lock
);
4392 enum lttcomm_return_code
lttng_consumer_init_command(
4393 struct lttng_consumer_local_data
*ctx
,
4394 const lttng_uuid sessiond_uuid
)
4396 enum lttcomm_return_code ret
;
4397 char uuid_str
[UUID_STR_LEN
];
4399 if (ctx
->sessiond_uuid
.is_set
) {
4400 ret
= LTTCOMM_CONSUMERD_ALREADY_SET
;
4404 ctx
->sessiond_uuid
.is_set
= true;
4405 memcpy(ctx
->sessiond_uuid
.value
, sessiond_uuid
, sizeof(lttng_uuid
));
4406 ret
= LTTCOMM_CONSUMERD_SUCCESS
;
4407 lttng_uuid_to_str(sessiond_uuid
, uuid_str
);
4408 DBG("Received session daemon UUID: %s", uuid_str
);
4413 enum lttcomm_return_code
lttng_consumer_create_trace_chunk(
4414 const uint64_t *relayd_id
, uint64_t session_id
,
4416 time_t chunk_creation_timestamp
,
4417 const char *chunk_override_name
,
4418 const struct lttng_credentials
*credentials
,
4419 struct lttng_directory_handle
*chunk_directory_handle
)
4422 enum lttcomm_return_code ret_code
= LTTCOMM_CONSUMERD_SUCCESS
;
4423 struct lttng_trace_chunk
*created_chunk
, *published_chunk
;
4424 enum lttng_trace_chunk_status chunk_status
;
4425 char relayd_id_buffer
[MAX_INT_DEC_LEN(*relayd_id
)];
4426 char creation_timestamp_buffer
[ISO8601_STR_LEN
];
4427 const char *relayd_id_str
= "(none)";
4428 const char *creation_timestamp_str
;
4429 struct lttng_ht_iter iter
;
4430 struct lttng_consumer_channel
*channel
;
4433 /* Only used for logging purposes. */
4434 ret
= snprintf(relayd_id_buffer
, sizeof(relayd_id_buffer
),
4435 "%" PRIu64
, *relayd_id
);
4436 if (ret
> 0 && ret
< sizeof(relayd_id_buffer
)) {
4437 relayd_id_str
= relayd_id_buffer
;
4439 relayd_id_str
= "(formatting error)";
4443 /* Local protocol error. */
4444 assert(chunk_creation_timestamp
);
4445 ret
= time_to_iso8601_str(chunk_creation_timestamp
,
4446 creation_timestamp_buffer
,
4447 sizeof(creation_timestamp_buffer
));
4448 creation_timestamp_str
= !ret
? creation_timestamp_buffer
:
4449 "(formatting error)";
4451 DBG("Consumer create trace chunk command: relay_id = %s"
4452 ", session_id = %" PRIu64
", chunk_id = %" PRIu64
4453 ", chunk_override_name = %s"
4454 ", chunk_creation_timestamp = %s",
4455 relayd_id_str
, session_id
, chunk_id
,
4456 chunk_override_name
? : "(none)",
4457 creation_timestamp_str
);
4460 * The trace chunk registry, as used by the consumer daemon, implicitly
4461 * owns the trace chunks. This is only needed in the consumer since
4462 * the consumer has no notion of a session beyond session IDs being
4463 * used to identify other objects.
4465 * The lttng_trace_chunk_registry_publish() call below provides a
4466 * reference which is not released; it implicitly becomes the session
4467 * daemon's reference to the chunk in the consumer daemon.
4469 * The lifetime of trace chunks in the consumer daemon is managed by
4470 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4471 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
4473 created_chunk
= lttng_trace_chunk_create(chunk_id
,
4474 chunk_creation_timestamp
);
4475 if (!created_chunk
) {
4476 ERR("Failed to create trace chunk");
4477 ret_code
= LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED
;
4481 if (chunk_override_name
) {
4482 chunk_status
= lttng_trace_chunk_override_name(created_chunk
,
4483 chunk_override_name
);
4484 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
4485 ret_code
= LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED
;
4490 if (chunk_directory_handle
) {
4491 chunk_status
= lttng_trace_chunk_set_credentials(created_chunk
,
4493 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
4494 ERR("Failed to set trace chunk credentials");
4495 ret_code
= LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED
;
4499 * The consumer daemon has no ownership of the chunk output
4502 chunk_status
= lttng_trace_chunk_set_as_user(created_chunk
,
4503 chunk_directory_handle
);
4504 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
4505 ERR("Failed to set trace chunk's directory handle");
4506 ret_code
= LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED
;
4511 published_chunk
= lttng_trace_chunk_registry_publish_chunk(
4512 consumer_data
.chunk_registry
, session_id
,
4514 lttng_trace_chunk_put(created_chunk
);
4515 created_chunk
= NULL
;
4516 if (!published_chunk
) {
4517 ERR("Failed to publish trace chunk");
4518 ret_code
= LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED
;
4523 cds_lfht_for_each_entry_duplicate(consumer_data
.channels_by_session_id_ht
->ht
,
4524 consumer_data
.channels_by_session_id_ht
->hash_fct(
4525 &session_id
, lttng_ht_seed
),
4526 consumer_data
.channels_by_session_id_ht
->match_fct
,
4527 &session_id
, &iter
.iter
, channel
,
4528 channels_by_session_id_ht_node
.node
) {
4529 ret
= lttng_consumer_channel_set_trace_chunk(channel
,
4533 * Roll-back the creation of this chunk.
4535 * This is important since the session daemon will
4536 * assume that the creation of this chunk failed and
4537 * will never ask for it to be closed, resulting
4538 * in a leak and an inconsistent state for some
4541 enum lttcomm_return_code close_ret
;
4543 DBG("Failed to set new trace chunk on existing channels, rolling back");
4544 close_ret
= lttng_consumer_close_trace_chunk(relayd_id
,
4545 session_id
, chunk_id
,
4546 chunk_creation_timestamp
, NULL
);
4547 if (close_ret
!= LTTCOMM_CONSUMERD_SUCCESS
) {
4548 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
", chunk_id = %" PRIu64
,
4549 session_id
, chunk_id
);
4552 ret_code
= LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED
;
4558 struct consumer_relayd_sock_pair
*relayd
;
4560 relayd
= consumer_find_relayd(*relayd_id
);
4562 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
4563 ret
= relayd_create_trace_chunk(
4564 &relayd
->control_sock
, published_chunk
);
4565 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
4567 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64
, *relayd_id
);
4570 if (!relayd
|| ret
) {
4571 enum lttcomm_return_code close_ret
;
4573 close_ret
= lttng_consumer_close_trace_chunk(relayd_id
,
4576 chunk_creation_timestamp
,
4578 if (close_ret
!= LTTCOMM_CONSUMERD_SUCCESS
) {
4579 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
", chunk_id = %" PRIu64
,
4584 ret_code
= LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED
;
4590 /* Release the reference returned by the "publish" operation. */
4591 lttng_trace_chunk_put(published_chunk
);
4596 enum lttcomm_return_code
lttng_consumer_close_trace_chunk(
4597 const uint64_t *relayd_id
, uint64_t session_id
,
4598 uint64_t chunk_id
, time_t chunk_close_timestamp
,
4599 const enum lttng_trace_chunk_command_type
*close_command
)
4601 enum lttcomm_return_code ret_code
= LTTCOMM_CONSUMERD_SUCCESS
;
4602 struct lttng_trace_chunk
*chunk
;
4603 char relayd_id_buffer
[MAX_INT_DEC_LEN(*relayd_id
)];
4604 const char *relayd_id_str
= "(none)";
4605 const char *close_command_name
= "none";
4606 struct lttng_ht_iter iter
;
4607 struct lttng_consumer_channel
*channel
;
4608 enum lttng_trace_chunk_status chunk_status
;
4613 /* Only used for logging purposes. */
4614 ret
= snprintf(relayd_id_buffer
, sizeof(relayd_id_buffer
),
4615 "%" PRIu64
, *relayd_id
);
4616 if (ret
> 0 && ret
< sizeof(relayd_id_buffer
)) {
4617 relayd_id_str
= relayd_id_buffer
;
4619 relayd_id_str
= "(formatting error)";
4622 if (close_command
) {
4623 close_command_name
= lttng_trace_chunk_command_type_get_name(
4627 DBG("Consumer close trace chunk command: relayd_id = %s"
4628 ", session_id = %" PRIu64
", chunk_id = %" PRIu64
4629 ", close command = %s",
4630 relayd_id_str
, session_id
, chunk_id
,
4631 close_command_name
);
4633 chunk
= lttng_trace_chunk_registry_find_chunk(
4634 consumer_data
.chunk_registry
, session_id
, chunk_id
);
4636 ERR("Failed to find chunk: session_id = %" PRIu64
4637 ", chunk_id = %" PRIu64
,
4638 session_id
, chunk_id
);
4639 ret_code
= LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK
;
4643 chunk_status
= lttng_trace_chunk_set_close_timestamp(chunk
,
4644 chunk_close_timestamp
);
4645 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
4646 ret_code
= LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED
;
4650 if (close_command
) {
4651 chunk_status
= lttng_trace_chunk_set_close_command(
4652 chunk
, *close_command
);
4653 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
4654 ret_code
= LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED
;
4660 * chunk is now invalid to access as we no longer hold a reference to
4661 * it; it is only kept around to compare it (by address) to the
4662 * current chunk found in the session's channels.
4665 cds_lfht_for_each_entry(consumer_data
.channel_ht
->ht
, &iter
.iter
,
4666 channel
, node
.node
) {
4670 * Only change the channel's chunk to NULL if it still
4671 * references the chunk being closed. The channel may
4672 * reference a newer channel in the case of a session
4673 * rotation. When a session rotation occurs, the "next"
4674 * chunk is created before the "current" chunk is closed.
4676 if (channel
->trace_chunk
!= chunk
) {
4679 ret
= lttng_consumer_channel_set_trace_chunk(channel
, NULL
);
4682 * Attempt to close the chunk on as many channels as
4685 ret_code
= LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED
;
4691 struct consumer_relayd_sock_pair
*relayd
;
4693 relayd
= consumer_find_relayd(*relayd_id
);
4695 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
4696 ret
= relayd_close_trace_chunk(
4697 &relayd
->control_sock
, chunk
);
4698 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
4700 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64
,
4704 if (!relayd
|| ret
) {
4705 ret_code
= LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED
;
4713 * Release the reference returned by the "find" operation and
4714 * the session daemon's implicit reference to the chunk.
4716 lttng_trace_chunk_put(chunk
);
4717 lttng_trace_chunk_put(chunk
);
4722 enum lttcomm_return_code
lttng_consumer_trace_chunk_exists(
4723 const uint64_t *relayd_id
, uint64_t session_id
,
4727 enum lttcomm_return_code ret_code
;
4728 struct lttng_trace_chunk
*chunk
;
4729 char relayd_id_buffer
[MAX_INT_DEC_LEN(*relayd_id
)];
4730 const char *relayd_id_str
= "(none)";
4731 const bool is_local_trace
= !relayd_id
;
4732 struct consumer_relayd_sock_pair
*relayd
= NULL
;
4733 bool chunk_exists_remote
;
4738 /* Only used for logging purposes. */
4739 ret
= snprintf(relayd_id_buffer
, sizeof(relayd_id_buffer
),
4740 "%" PRIu64
, *relayd_id
);
4741 if (ret
> 0 && ret
< sizeof(relayd_id_buffer
)) {
4742 relayd_id_str
= relayd_id_buffer
;
4744 relayd_id_str
= "(formatting error)";
4748 DBG("Consumer trace chunk exists command: relayd_id = %s"
4749 ", chunk_id = %" PRIu64
, relayd_id_str
,
4751 chunk
= lttng_trace_chunk_registry_find_chunk(
4752 consumer_data
.chunk_registry
, session_id
,
4754 DBG("Trace chunk %s locally", chunk
? "exists" : "does not exist");
4756 ret_code
= LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL
;
4757 lttng_trace_chunk_put(chunk
);
4759 } else if (is_local_trace
) {
4760 ret_code
= LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK
;
4765 relayd
= consumer_find_relayd(*relayd_id
);
4767 ERR("Failed to find relayd %" PRIu64
, *relayd_id
);
4768 ret_code
= LTTCOMM_CONSUMERD_INVALID_PARAMETERS
;
4769 goto end_rcu_unlock
;
4771 DBG("Looking up existence of trace chunk on relay daemon");
4772 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
4773 ret
= relayd_trace_chunk_exists(&relayd
->control_sock
, chunk_id
,
4774 &chunk_exists_remote
);
4775 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
4777 ERR("Failed to look-up the existence of trace chunk on relay daemon");
4778 ret_code
= LTTCOMM_CONSUMERD_RELAYD_FAIL
;
4779 goto end_rcu_unlock
;
4782 ret_code
= chunk_exists_remote
?
4783 LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE
:
4784 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK
;
4785 DBG("Trace chunk %s on relay daemon",
4786 chunk_exists_remote
? "exists" : "does not exist");