2 * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * SPDX-License-Identifier: GPL-2.0-only
17 #include <sys/socket.h>
18 #include <sys/types.h>
24 #include <bin/lttng-consumerd/health-consumerd.h>
25 #include <common/common.h>
26 #include <common/kernel-ctl/kernel-ctl.h>
27 #include <common/sessiond-comm/sessiond-comm.h>
28 #include <common/sessiond-comm/relayd.h>
29 #include <common/compat/fcntl.h>
30 #include <common/compat/endian.h>
31 #include <common/pipe.h>
32 #include <common/relayd/relayd.h>
33 #include <common/utils.h>
34 #include <common/consumer/consumer-stream.h>
35 #include <common/index/index.h>
36 #include <common/consumer/consumer-timer.h>
37 #include <common/optional.h>
38 #include <common/buffer-view.h>
39 #include <common/consumer/consumer.h>
40 #include <common/consumer/metadata-bucket.h>
42 #include "kernel-consumer.h"
44 extern struct lttng_consumer_global_data consumer_data
;
45 extern int consumer_poll_timeout
;
48 * Take a snapshot for a specific fd
50 * Returns 0 on success, < 0 on error
52 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream
*stream
)
55 int infd
= stream
->wait_fd
;
57 ret
= kernctl_snapshot(infd
);
59 * -EAGAIN is not an error, it just means that there is no data to
62 if (ret
!= 0 && ret
!= -EAGAIN
) {
63 PERROR("Getting sub-buffer snapshot.");
70 * Sample consumed and produced positions for a specific fd.
72 * Returns 0 on success, < 0 on error.
74 int lttng_kconsumer_sample_snapshot_positions(
75 struct lttng_consumer_stream
*stream
)
79 return kernctl_snapshot_sample_positions(stream
->wait_fd
);
83 * Get the produced position
85 * Returns 0 on success, < 0 on error
87 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream
*stream
,
91 int infd
= stream
->wait_fd
;
93 ret
= kernctl_snapshot_get_produced(infd
, pos
);
95 PERROR("kernctl_snapshot_get_produced");
102 * Get the consumerd position
104 * Returns 0 on success, < 0 on error
106 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream
*stream
,
110 int infd
= stream
->wait_fd
;
112 ret
= kernctl_snapshot_get_consumed(infd
, pos
);
114 PERROR("kernctl_snapshot_get_consumed");
121 int get_current_subbuf_addr(struct lttng_consumer_stream
*stream
,
125 unsigned long mmap_offset
;
126 const char *mmap_base
= stream
->mmap_base
;
128 ret
= kernctl_get_mmap_read_offset(stream
->wait_fd
, &mmap_offset
);
130 PERROR("Failed to get mmap read offset");
134 *addr
= mmap_base
+ mmap_offset
;
140 * Take a snapshot of all the stream of a channel
141 * RCU read-side lock must be held across this function to ensure existence of
142 * channel. The channel lock must be held by the caller.
144 * Returns 0 on success, < 0 on error
146 static int lttng_kconsumer_snapshot_channel(
147 struct lttng_consumer_channel
*channel
,
148 uint64_t key
, char *path
, uint64_t relayd_id
,
149 uint64_t nb_packets_per_stream
,
150 struct lttng_consumer_local_data
*ctx
)
153 struct lttng_consumer_stream
*stream
;
155 DBG("Kernel consumer snapshot channel %" PRIu64
, key
);
159 /* Splice is not supported yet for channel snapshot. */
160 if (channel
->output
!= CONSUMER_CHANNEL_MMAP
) {
161 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
167 cds_list_for_each_entry(stream
, &channel
->streams
.head
, send_node
) {
168 unsigned long consumed_pos
, produced_pos
;
170 health_code_update();
173 * Lock stream because we are about to change its state.
175 pthread_mutex_lock(&stream
->lock
);
177 assert(channel
->trace_chunk
);
178 if (!lttng_trace_chunk_get(channel
->trace_chunk
)) {
180 * Can't happen barring an internal error as the channel
181 * holds a reference to the trace chunk.
183 ERR("Failed to acquire reference to channel's trace chunk");
187 assert(!stream
->trace_chunk
);
188 stream
->trace_chunk
= channel
->trace_chunk
;
191 * Assign the received relayd ID so we can use it for streaming. The streams
192 * are not visible to anyone so this is OK to change it.
194 stream
->net_seq_idx
= relayd_id
;
195 channel
->relayd_id
= relayd_id
;
196 if (relayd_id
!= (uint64_t) -1ULL) {
197 ret
= consumer_send_relayd_stream(stream
, path
);
199 ERR("sending stream to relayd");
203 ret
= consumer_stream_create_output_files(stream
,
208 DBG("Kernel consumer snapshot stream (%" PRIu64
")",
212 ret
= kernctl_buffer_flush_empty(stream
->wait_fd
);
215 * Doing a buffer flush which does not take into
216 * account empty packets. This is not perfect
217 * for stream intersection, but required as a
218 * fall-back when "flush_empty" is not
219 * implemented by lttng-modules.
221 ret
= kernctl_buffer_flush(stream
->wait_fd
);
223 ERR("Failed to flush kernel stream");
229 ret
= lttng_kconsumer_take_snapshot(stream
);
231 ERR("Taking kernel snapshot");
235 ret
= lttng_kconsumer_get_produced_snapshot(stream
, &produced_pos
);
237 ERR("Produced kernel snapshot position");
241 ret
= lttng_kconsumer_get_consumed_snapshot(stream
, &consumed_pos
);
243 ERR("Consumerd kernel snapshot position");
247 consumed_pos
= consumer_get_consume_start_pos(consumed_pos
,
248 produced_pos
, nb_packets_per_stream
,
249 stream
->max_sb_size
);
251 while ((long) (consumed_pos
- produced_pos
) < 0) {
253 unsigned long len
, padded_len
;
254 const char *subbuf_addr
;
255 struct lttng_buffer_view subbuf_view
;
257 health_code_update();
258 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos
);
260 ret
= kernctl_get_subbuf(stream
->wait_fd
, &consumed_pos
);
262 if (ret
!= -EAGAIN
) {
263 PERROR("kernctl_get_subbuf snapshot");
266 DBG("Kernel consumer get subbuf failed. Skipping it.");
267 consumed_pos
+= stream
->max_sb_size
;
268 stream
->chan
->lost_packets
++;
272 ret
= kernctl_get_subbuf_size(stream
->wait_fd
, &len
);
274 ERR("Snapshot kernctl_get_subbuf_size");
275 goto error_put_subbuf
;
278 ret
= kernctl_get_padded_subbuf_size(stream
->wait_fd
, &padded_len
);
280 ERR("Snapshot kernctl_get_padded_subbuf_size");
281 goto error_put_subbuf
;
284 ret
= get_current_subbuf_addr(stream
, &subbuf_addr
);
286 goto error_put_subbuf
;
289 subbuf_view
= lttng_buffer_view_init(
290 subbuf_addr
, 0, padded_len
);
291 read_len
= lttng_consumer_on_read_subbuffer_mmap(
292 stream
, &subbuf_view
,
295 * We write the padded len in local tracefiles but the data len
296 * when using a relay. Display the error but continue processing
297 * to try to release the subbuffer.
299 if (relayd_id
!= (uint64_t) -1ULL) {
300 if (read_len
!= len
) {
301 ERR("Error sending to the relay (ret: %zd != len: %lu)",
305 if (read_len
!= padded_len
) {
306 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
307 read_len
, padded_len
);
311 ret
= kernctl_put_subbuf(stream
->wait_fd
);
313 ERR("Snapshot kernctl_put_subbuf");
316 consumed_pos
+= stream
->max_sb_size
;
319 if (relayd_id
== (uint64_t) -1ULL) {
320 if (stream
->out_fd
>= 0) {
321 ret
= close(stream
->out_fd
);
323 PERROR("Kernel consumer snapshot close out_fd");
329 close_relayd_stream(stream
);
330 stream
->net_seq_idx
= (uint64_t) -1ULL;
332 lttng_trace_chunk_put(stream
->trace_chunk
);
333 stream
->trace_chunk
= NULL
;
334 pthread_mutex_unlock(&stream
->lock
);
342 ret
= kernctl_put_subbuf(stream
->wait_fd
);
344 ERR("Snapshot kernctl_put_subbuf error path");
347 pthread_mutex_unlock(&stream
->lock
);
354 * Read the whole metadata available for a snapshot.
355 * RCU read-side lock must be held across this function to ensure existence of
356 * metadata_channel. The channel lock must be held by the caller.
358 * Returns 0 on success, < 0 on error
360 static int lttng_kconsumer_snapshot_metadata(
361 struct lttng_consumer_channel
*metadata_channel
,
362 uint64_t key
, char *path
, uint64_t relayd_id
,
363 struct lttng_consumer_local_data
*ctx
)
365 int ret
, use_relayd
= 0;
367 struct lttng_consumer_stream
*metadata_stream
;
371 DBG("Kernel consumer snapshot metadata with key %" PRIu64
" at path %s",
376 metadata_stream
= metadata_channel
->metadata_stream
;
377 assert(metadata_stream
);
379 pthread_mutex_lock(&metadata_stream
->lock
);
380 assert(metadata_channel
->trace_chunk
);
381 assert(metadata_stream
->trace_chunk
);
383 /* Flag once that we have a valid relayd for the stream. */
384 if (relayd_id
!= (uint64_t) -1ULL) {
389 ret
= consumer_send_relayd_stream(metadata_stream
, path
);
394 ret
= consumer_stream_create_output_files(metadata_stream
,
402 health_code_update();
404 ret_read
= lttng_consumer_read_subbuffer(metadata_stream
, ctx
, true);
406 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
411 } while (ret_read
> 0);
414 close_relayd_stream(metadata_stream
);
415 metadata_stream
->net_seq_idx
= (uint64_t) -1ULL;
417 if (metadata_stream
->out_fd
>= 0) {
418 ret
= close(metadata_stream
->out_fd
);
420 PERROR("Kernel consumer snapshot metadata close out_fd");
422 * Don't go on error here since the snapshot was successful at this
423 * point but somehow the close failed.
426 metadata_stream
->out_fd
= -1;
427 lttng_trace_chunk_put(metadata_stream
->trace_chunk
);
428 metadata_stream
->trace_chunk
= NULL
;
434 pthread_mutex_unlock(&metadata_stream
->lock
);
435 cds_list_del(&metadata_stream
->send_node
);
436 consumer_stream_destroy(metadata_stream
, NULL
);
437 metadata_channel
->metadata_stream
= NULL
;
443 * Receive command from session daemon and process it.
445 * Return 1 on success else a negative value or 0.
447 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data
*ctx
,
448 int sock
, struct pollfd
*consumer_sockpoll
)
451 enum lttcomm_return_code ret_code
= LTTCOMM_CONSUMERD_SUCCESS
;
452 struct lttcomm_consumer_msg msg
;
454 health_code_update();
456 ret
= lttcomm_recv_unix_sock(sock
, &msg
, sizeof(msg
));
457 if (ret
!= sizeof(msg
)) {
459 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_ERROR_RECV_CMD
);
465 health_code_update();
467 /* Deprecated command */
468 assert(msg
.cmd_type
!= LTTNG_CONSUMER_STOP
);
470 health_code_update();
472 /* relayd needs RCU read-side protection */
475 switch (msg
.cmd_type
) {
476 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET
:
478 /* Session daemon status message are handled in the following call. */
479 consumer_add_relayd_socket(msg
.u
.relayd_sock
.net_index
,
480 msg
.u
.relayd_sock
.type
, ctx
, sock
, consumer_sockpoll
,
481 &msg
.u
.relayd_sock
.sock
, msg
.u
.relayd_sock
.session_id
,
482 msg
.u
.relayd_sock
.relayd_session_id
);
485 case LTTNG_CONSUMER_ADD_CHANNEL
:
487 struct lttng_consumer_channel
*new_channel
;
489 const uint64_t chunk_id
= msg
.u
.channel
.chunk_id
.value
;
491 health_code_update();
493 /* First send a status message before receiving the fds. */
494 ret
= consumer_send_status_msg(sock
, ret_code
);
496 /* Somehow, the session daemon is not responding anymore. */
500 health_code_update();
502 DBG("consumer_add_channel %" PRIu64
, msg
.u
.channel
.channel_key
);
503 new_channel
= consumer_allocate_channel(msg
.u
.channel
.channel_key
,
504 msg
.u
.channel
.session_id
,
505 msg
.u
.channel
.chunk_id
.is_set
?
507 msg
.u
.channel
.pathname
,
509 msg
.u
.channel
.relayd_id
, msg
.u
.channel
.output
,
510 msg
.u
.channel
.tracefile_size
,
511 msg
.u
.channel
.tracefile_count
, 0,
512 msg
.u
.channel
.monitor
,
513 msg
.u
.channel
.live_timer_interval
,
514 msg
.u
.channel
.is_live
,
516 if (new_channel
== NULL
) {
517 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_OUTFD_ERROR
);
520 new_channel
->nb_init_stream_left
= msg
.u
.channel
.nb_init_streams
;
521 switch (msg
.u
.channel
.output
) {
522 case LTTNG_EVENT_SPLICE
:
523 new_channel
->output
= CONSUMER_CHANNEL_SPLICE
;
525 case LTTNG_EVENT_MMAP
:
526 new_channel
->output
= CONSUMER_CHANNEL_MMAP
;
529 ERR("Channel output unknown %d", msg
.u
.channel
.output
);
533 /* Translate and save channel type. */
534 switch (msg
.u
.channel
.type
) {
535 case CONSUMER_CHANNEL_TYPE_DATA
:
536 case CONSUMER_CHANNEL_TYPE_METADATA
:
537 new_channel
->type
= msg
.u
.channel
.type
;
544 health_code_update();
546 if (ctx
->on_recv_channel
!= NULL
) {
547 ret_recv
= ctx
->on_recv_channel(new_channel
);
549 ret
= consumer_add_channel(new_channel
, ctx
);
550 } else if (ret_recv
< 0) {
554 ret
= consumer_add_channel(new_channel
, ctx
);
556 if (msg
.u
.channel
.type
== CONSUMER_CHANNEL_TYPE_DATA
&& !ret
) {
557 int monitor_start_ret
;
559 DBG("Consumer starting monitor timer");
560 consumer_timer_live_start(new_channel
,
561 msg
.u
.channel
.live_timer_interval
);
562 monitor_start_ret
= consumer_timer_monitor_start(
564 msg
.u
.channel
.monitor_timer_interval
);
565 if (monitor_start_ret
< 0) {
566 ERR("Starting channel monitoring timer failed");
572 health_code_update();
574 /* If we received an error in add_channel, we need to report it. */
576 ret
= consumer_send_status_msg(sock
, ret
);
585 case LTTNG_CONSUMER_ADD_STREAM
:
588 struct lttng_pipe
*stream_pipe
;
589 struct lttng_consumer_stream
*new_stream
;
590 struct lttng_consumer_channel
*channel
;
594 * Get stream's channel reference. Needed when adding the stream to the
597 channel
= consumer_find_channel(msg
.u
.stream
.channel_key
);
600 * We could not find the channel. Can happen if cpu hotplug
601 * happens while tearing down.
603 ERR("Unable to find channel key %" PRIu64
, msg
.u
.stream
.channel_key
);
604 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
607 health_code_update();
609 /* First send a status message before receiving the fds. */
610 ret
= consumer_send_status_msg(sock
, ret_code
);
612 /* Somehow, the session daemon is not responding anymore. */
613 goto error_add_stream_fatal
;
616 health_code_update();
618 if (ret_code
!= LTTCOMM_CONSUMERD_SUCCESS
) {
619 /* Channel was not found. */
620 goto error_add_stream_nosignal
;
625 ret
= lttng_consumer_poll_socket(consumer_sockpoll
);
628 goto error_add_stream_fatal
;
631 health_code_update();
633 /* Get stream file descriptor from socket */
634 ret
= lttcomm_recv_fds_unix_sock(sock
, &fd
, 1);
635 if (ret
!= sizeof(fd
)) {
636 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_ERROR_RECV_FD
);
640 health_code_update();
643 * Send status code to session daemon only if the recv works. If the
644 * above recv() failed, the session daemon is notified through the
645 * error socket and the teardown is eventually done.
647 ret
= consumer_send_status_msg(sock
, ret_code
);
649 /* Somehow, the session daemon is not responding anymore. */
650 goto error_add_stream_nosignal
;
653 health_code_update();
655 pthread_mutex_lock(&channel
->lock
);
656 new_stream
= consumer_stream_create(
663 channel
->trace_chunk
,
668 if (new_stream
== NULL
) {
673 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_OUTFD_ERROR
);
676 pthread_mutex_unlock(&channel
->lock
);
677 goto error_add_stream_nosignal
;
680 new_stream
->wait_fd
= fd
;
681 ret
= kernctl_get_max_subbuf_size(new_stream
->wait_fd
,
682 &new_stream
->max_sb_size
);
684 pthread_mutex_unlock(&channel
->lock
);
685 ERR("Failed to get kernel maximal subbuffer size");
686 goto error_add_stream_nosignal
;
689 consumer_stream_update_channel_attributes(new_stream
,
693 * We've just assigned the channel to the stream so increment the
694 * refcount right now. We don't need to increment the refcount for
695 * streams in no monitor because we handle manually the cleanup of
696 * those. It is very important to make sure there is NO prior
697 * consumer_del_stream() calls or else the refcount will be unbalanced.
699 if (channel
->monitor
) {
700 uatomic_inc(&new_stream
->chan
->refcount
);
704 * The buffer flush is done on the session daemon side for the kernel
705 * so no need for the stream "hangup_flush_done" variable to be
706 * tracked. This is important for a kernel stream since we don't rely
707 * on the flush state of the stream to read data. It's not the case for
708 * user space tracing.
710 new_stream
->hangup_flush_done
= 0;
712 health_code_update();
714 pthread_mutex_lock(&new_stream
->lock
);
715 if (ctx
->on_recv_stream
) {
716 ret
= ctx
->on_recv_stream(new_stream
);
718 pthread_mutex_unlock(&new_stream
->lock
);
719 pthread_mutex_unlock(&channel
->lock
);
720 consumer_stream_free(new_stream
);
721 goto error_add_stream_nosignal
;
724 health_code_update();
726 if (new_stream
->metadata_flag
) {
727 channel
->metadata_stream
= new_stream
;
730 /* Do not monitor this stream. */
731 if (!channel
->monitor
) {
732 DBG("Kernel consumer add stream %s in no monitor mode with "
733 "relayd id %" PRIu64
, new_stream
->name
,
734 new_stream
->net_seq_idx
);
735 cds_list_add(&new_stream
->send_node
, &channel
->streams
.head
);
736 pthread_mutex_unlock(&new_stream
->lock
);
737 pthread_mutex_unlock(&channel
->lock
);
741 /* Send stream to relayd if the stream has an ID. */
742 if (new_stream
->net_seq_idx
!= (uint64_t) -1ULL) {
743 ret
= consumer_send_relayd_stream(new_stream
,
744 new_stream
->chan
->pathname
);
746 pthread_mutex_unlock(&new_stream
->lock
);
747 pthread_mutex_unlock(&channel
->lock
);
748 consumer_stream_free(new_stream
);
749 goto error_add_stream_nosignal
;
753 * If adding an extra stream to an already
754 * existing channel (e.g. cpu hotplug), we need
755 * to send the "streams_sent" command to relayd.
757 if (channel
->streams_sent_to_relayd
) {
758 ret
= consumer_send_relayd_streams_sent(
759 new_stream
->net_seq_idx
);
761 pthread_mutex_unlock(&new_stream
->lock
);
762 pthread_mutex_unlock(&channel
->lock
);
763 goto error_add_stream_nosignal
;
767 pthread_mutex_unlock(&new_stream
->lock
);
768 pthread_mutex_unlock(&channel
->lock
);
770 /* Get the right pipe where the stream will be sent. */
771 if (new_stream
->metadata_flag
) {
772 consumer_add_metadata_stream(new_stream
);
773 stream_pipe
= ctx
->consumer_metadata_pipe
;
775 consumer_add_data_stream(new_stream
);
776 stream_pipe
= ctx
->consumer_data_pipe
;
779 /* Visible to other threads */
780 new_stream
->globally_visible
= 1;
782 health_code_update();
784 ret
= lttng_pipe_write(stream_pipe
, &new_stream
, sizeof(new_stream
));
786 ERR("Consumer write %s stream to pipe %d",
787 new_stream
->metadata_flag
? "metadata" : "data",
788 lttng_pipe_get_writefd(stream_pipe
));
789 if (new_stream
->metadata_flag
) {
790 consumer_del_stream_for_metadata(new_stream
);
792 consumer_del_stream_for_data(new_stream
);
794 goto error_add_stream_nosignal
;
797 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64
,
798 new_stream
->name
, fd
, new_stream
->chan
->pathname
, new_stream
->relayd_stream_id
);
801 error_add_stream_nosignal
:
803 error_add_stream_fatal
:
806 case LTTNG_CONSUMER_STREAMS_SENT
:
808 struct lttng_consumer_channel
*channel
;
811 * Get stream's channel reference. Needed when adding the stream to the
814 channel
= consumer_find_channel(msg
.u
.sent_streams
.channel_key
);
817 * We could not find the channel. Can happen if cpu hotplug
818 * happens while tearing down.
820 ERR("Unable to find channel key %" PRIu64
,
821 msg
.u
.sent_streams
.channel_key
);
822 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
825 health_code_update();
828 * Send status code to session daemon.
830 ret
= consumer_send_status_msg(sock
, ret_code
);
831 if (ret
< 0 || ret_code
!= LTTCOMM_CONSUMERD_SUCCESS
) {
832 /* Somehow, the session daemon is not responding anymore. */
833 goto error_streams_sent_nosignal
;
836 health_code_update();
839 * We should not send this message if we don't monitor the
840 * streams in this channel.
842 if (!channel
->monitor
) {
843 goto end_error_streams_sent
;
846 health_code_update();
847 /* Send stream to relayd if the stream has an ID. */
848 if (msg
.u
.sent_streams
.net_seq_idx
!= (uint64_t) -1ULL) {
849 ret
= consumer_send_relayd_streams_sent(
850 msg
.u
.sent_streams
.net_seq_idx
);
852 goto error_streams_sent_nosignal
;
854 channel
->streams_sent_to_relayd
= true;
856 end_error_streams_sent
:
858 error_streams_sent_nosignal
:
861 case LTTNG_CONSUMER_UPDATE_STREAM
:
866 case LTTNG_CONSUMER_DESTROY_RELAYD
:
868 uint64_t index
= msg
.u
.destroy_relayd
.net_seq_idx
;
869 struct consumer_relayd_sock_pair
*relayd
;
871 DBG("Kernel consumer destroying relayd %" PRIu64
, index
);
873 /* Get relayd reference if exists. */
874 relayd
= consumer_find_relayd(index
);
875 if (relayd
== NULL
) {
876 DBG("Unable to find relayd %" PRIu64
, index
);
877 ret_code
= LTTCOMM_CONSUMERD_RELAYD_FAIL
;
881 * Each relayd socket pair has a refcount of stream attached to it
882 * which tells if the relayd is still active or not depending on the
885 * This will set the destroy flag of the relayd object and destroy it
886 * if the refcount reaches zero when called.
888 * The destroy can happen either here or when a stream fd hangs up.
891 consumer_flag_relayd_for_destroy(relayd
);
894 health_code_update();
896 ret
= consumer_send_status_msg(sock
, ret_code
);
898 /* Somehow, the session daemon is not responding anymore. */
904 case LTTNG_CONSUMER_DATA_PENDING
:
907 uint64_t id
= msg
.u
.data_pending
.session_id
;
909 DBG("Kernel consumer data pending command for id %" PRIu64
, id
);
911 ret
= consumer_data_pending(id
);
913 health_code_update();
915 /* Send back returned value to session daemon */
916 ret
= lttcomm_send_unix_sock(sock
, &ret
, sizeof(ret
));
918 PERROR("send data pending ret code");
923 * No need to send back a status message since the data pending
924 * returned value is the response.
928 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL
:
930 struct lttng_consumer_channel
*channel
;
931 uint64_t key
= msg
.u
.snapshot_channel
.key
;
933 channel
= consumer_find_channel(key
);
935 ERR("Channel %" PRIu64
" not found", key
);
936 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
938 pthread_mutex_lock(&channel
->lock
);
939 if (msg
.u
.snapshot_channel
.metadata
== 1) {
940 ret
= lttng_kconsumer_snapshot_metadata(channel
, key
,
941 msg
.u
.snapshot_channel
.pathname
,
942 msg
.u
.snapshot_channel
.relayd_id
, ctx
);
944 ERR("Snapshot metadata failed");
945 ret_code
= LTTCOMM_CONSUMERD_SNAPSHOT_FAILED
;
948 ret
= lttng_kconsumer_snapshot_channel(channel
, key
,
949 msg
.u
.snapshot_channel
.pathname
,
950 msg
.u
.snapshot_channel
.relayd_id
,
951 msg
.u
.snapshot_channel
.nb_packets_per_stream
,
954 ERR("Snapshot channel failed");
955 ret_code
= LTTCOMM_CONSUMERD_SNAPSHOT_FAILED
;
958 pthread_mutex_unlock(&channel
->lock
);
960 health_code_update();
962 ret
= consumer_send_status_msg(sock
, ret_code
);
964 /* Somehow, the session daemon is not responding anymore. */
969 case LTTNG_CONSUMER_DESTROY_CHANNEL
:
971 uint64_t key
= msg
.u
.destroy_channel
.key
;
972 struct lttng_consumer_channel
*channel
;
974 channel
= consumer_find_channel(key
);
976 ERR("Kernel consumer destroy channel %" PRIu64
" not found", key
);
977 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
980 health_code_update();
982 ret
= consumer_send_status_msg(sock
, ret_code
);
984 /* Somehow, the session daemon is not responding anymore. */
985 goto end_destroy_channel
;
988 health_code_update();
990 /* Stop right now if no channel was found. */
992 goto end_destroy_channel
;
996 * This command should ONLY be issued for channel with streams set in
999 assert(!channel
->monitor
);
1002 * The refcount should ALWAYS be 0 in the case of a channel in no
1005 assert(!uatomic_sub_return(&channel
->refcount
, 1));
1007 consumer_del_channel(channel
);
1008 end_destroy_channel
:
1011 case LTTNG_CONSUMER_DISCARDED_EVENTS
:
1015 struct lttng_consumer_channel
*channel
;
1016 uint64_t id
= msg
.u
.discarded_events
.session_id
;
1017 uint64_t key
= msg
.u
.discarded_events
.channel_key
;
1019 DBG("Kernel consumer discarded events command for session id %"
1020 PRIu64
", channel key %" PRIu64
, id
, key
);
1022 channel
= consumer_find_channel(key
);
1024 ERR("Kernel consumer discarded events channel %"
1025 PRIu64
" not found", key
);
1028 count
= channel
->discarded_events
;
1031 health_code_update();
1033 /* Send back returned value to session daemon */
1034 ret
= lttcomm_send_unix_sock(sock
, &count
, sizeof(count
));
1036 PERROR("send discarded events");
1042 case LTTNG_CONSUMER_LOST_PACKETS
:
1046 struct lttng_consumer_channel
*channel
;
1047 uint64_t id
= msg
.u
.lost_packets
.session_id
;
1048 uint64_t key
= msg
.u
.lost_packets
.channel_key
;
1050 DBG("Kernel consumer lost packets command for session id %"
1051 PRIu64
", channel key %" PRIu64
, id
, key
);
1053 channel
= consumer_find_channel(key
);
1055 ERR("Kernel consumer lost packets channel %"
1056 PRIu64
" not found", key
);
1059 count
= channel
->lost_packets
;
1062 health_code_update();
1064 /* Send back returned value to session daemon */
1065 ret
= lttcomm_send_unix_sock(sock
, &count
, sizeof(count
));
1067 PERROR("send lost packets");
1073 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE
:
1075 int channel_monitor_pipe
;
1077 ret_code
= LTTCOMM_CONSUMERD_SUCCESS
;
1078 /* Successfully received the command's type. */
1079 ret
= consumer_send_status_msg(sock
, ret_code
);
1084 ret
= lttcomm_recv_fds_unix_sock(sock
, &channel_monitor_pipe
,
1086 if (ret
!= sizeof(channel_monitor_pipe
)) {
1087 ERR("Failed to receive channel monitor pipe");
1091 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe
);
1092 ret
= consumer_timer_thread_set_channel_monitor_pipe(
1093 channel_monitor_pipe
);
1097 ret_code
= LTTCOMM_CONSUMERD_SUCCESS
;
1098 /* Set the pipe as non-blocking. */
1099 ret
= fcntl(channel_monitor_pipe
, F_GETFL
, 0);
1101 PERROR("fcntl get flags of the channel monitoring pipe");
1106 ret
= fcntl(channel_monitor_pipe
, F_SETFL
,
1107 flags
| O_NONBLOCK
);
1109 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1112 DBG("Channel monitor pipe set as non-blocking");
1114 ret_code
= LTTCOMM_CONSUMERD_ALREADY_SET
;
1116 ret
= consumer_send_status_msg(sock
, ret_code
);
1122 case LTTNG_CONSUMER_ROTATE_CHANNEL
:
1124 struct lttng_consumer_channel
*channel
;
1125 uint64_t key
= msg
.u
.rotate_channel
.key
;
1127 DBG("Consumer rotate channel %" PRIu64
, key
);
1129 channel
= consumer_find_channel(key
);
1131 ERR("Channel %" PRIu64
" not found", key
);
1132 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
1135 * Sample the rotate position of all the streams in this channel.
1137 ret
= lttng_consumer_rotate_channel(channel
, key
,
1138 msg
.u
.rotate_channel
.relayd_id
,
1139 msg
.u
.rotate_channel
.metadata
,
1142 ERR("Rotate channel failed");
1143 ret_code
= LTTCOMM_CONSUMERD_ROTATION_FAIL
;
1146 health_code_update();
1148 ret
= consumer_send_status_msg(sock
, ret_code
);
1150 /* Somehow, the session daemon is not responding anymore. */
1151 goto error_rotate_channel
;
1154 /* Rotate the streams that are ready right now. */
1155 ret
= lttng_consumer_rotate_ready_streams(
1158 ERR("Rotate ready streams failed");
1162 error_rotate_channel
:
1165 case LTTNG_CONSUMER_CLEAR_CHANNEL
:
1167 struct lttng_consumer_channel
*channel
;
1168 uint64_t key
= msg
.u
.clear_channel
.key
;
1170 channel
= consumer_find_channel(key
);
1172 DBG("Channel %" PRIu64
" not found", key
);
1173 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
1175 ret
= lttng_consumer_clear_channel(channel
);
1177 ERR("Clear channel failed");
1181 health_code_update();
1183 ret
= consumer_send_status_msg(sock
, ret_code
);
1185 /* Somehow, the session daemon is not responding anymore. */
1191 case LTTNG_CONSUMER_INIT
:
1193 ret_code
= lttng_consumer_init_command(ctx
,
1194 msg
.u
.init
.sessiond_uuid
);
1195 health_code_update();
1196 ret
= consumer_send_status_msg(sock
, ret_code
);
1198 /* Somehow, the session daemon is not responding anymore. */
1203 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK
:
1205 const struct lttng_credentials credentials
= {
1206 .uid
= msg
.u
.create_trace_chunk
.credentials
.value
.uid
,
1207 .gid
= msg
.u
.create_trace_chunk
.credentials
.value
.gid
,
1209 const bool is_local_trace
=
1210 !msg
.u
.create_trace_chunk
.relayd_id
.is_set
;
1211 const uint64_t relayd_id
=
1212 msg
.u
.create_trace_chunk
.relayd_id
.value
;
1213 const char *chunk_override_name
=
1214 *msg
.u
.create_trace_chunk
.override_name
?
1215 msg
.u
.create_trace_chunk
.override_name
:
1217 struct lttng_directory_handle
*chunk_directory_handle
= NULL
;
1220 * The session daemon will only provide a chunk directory file
1221 * descriptor for local traces.
1223 if (is_local_trace
) {
1226 /* Acnowledge the reception of the command. */
1227 ret
= consumer_send_status_msg(sock
,
1228 LTTCOMM_CONSUMERD_SUCCESS
);
1230 /* Somehow, the session daemon is not responding anymore. */
1234 ret
= lttcomm_recv_fds_unix_sock(sock
, &chunk_dirfd
, 1);
1235 if (ret
!= sizeof(chunk_dirfd
)) {
1236 ERR("Failed to receive trace chunk directory file descriptor");
1240 DBG("Received trace chunk directory fd (%d)",
1242 chunk_directory_handle
= lttng_directory_handle_create_from_dirfd(
1244 if (!chunk_directory_handle
) {
1245 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1246 if (close(chunk_dirfd
)) {
1247 PERROR("Failed to close chunk directory file descriptor");
1253 ret_code
= lttng_consumer_create_trace_chunk(
1254 !is_local_trace
? &relayd_id
: NULL
,
1255 msg
.u
.create_trace_chunk
.session_id
,
1256 msg
.u
.create_trace_chunk
.chunk_id
,
1257 (time_t) msg
.u
.create_trace_chunk
1258 .creation_timestamp
,
1259 chunk_override_name
,
1260 msg
.u
.create_trace_chunk
.credentials
.is_set
?
1263 chunk_directory_handle
);
1264 lttng_directory_handle_put(chunk_directory_handle
);
1265 goto end_msg_sessiond
;
1267 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK
:
1269 enum lttng_trace_chunk_command_type close_command
=
1270 msg
.u
.close_trace_chunk
.close_command
.value
;
1271 const uint64_t relayd_id
=
1272 msg
.u
.close_trace_chunk
.relayd_id
.value
;
1273 struct lttcomm_consumer_close_trace_chunk_reply reply
;
1274 char path
[LTTNG_PATH_MAX
];
1276 ret_code
= lttng_consumer_close_trace_chunk(
1277 msg
.u
.close_trace_chunk
.relayd_id
.is_set
?
1280 msg
.u
.close_trace_chunk
.session_id
,
1281 msg
.u
.close_trace_chunk
.chunk_id
,
1282 (time_t) msg
.u
.close_trace_chunk
.close_timestamp
,
1283 msg
.u
.close_trace_chunk
.close_command
.is_set
?
1286 reply
.ret_code
= ret_code
;
1287 reply
.path_length
= strlen(path
) + 1;
1288 ret
= lttcomm_send_unix_sock(sock
, &reply
, sizeof(reply
));
1289 if (ret
!= sizeof(reply
)) {
1292 ret
= lttcomm_send_unix_sock(sock
, path
, reply
.path_length
);
1293 if (ret
!= reply
.path_length
) {
1298 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS
:
1300 const uint64_t relayd_id
=
1301 msg
.u
.trace_chunk_exists
.relayd_id
.value
;
1303 ret_code
= lttng_consumer_trace_chunk_exists(
1304 msg
.u
.trace_chunk_exists
.relayd_id
.is_set
?
1306 msg
.u
.trace_chunk_exists
.session_id
,
1307 msg
.u
.trace_chunk_exists
.chunk_id
);
1308 goto end_msg_sessiond
;
1310 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS
:
1312 const uint64_t key
= msg
.u
.open_channel_packets
.key
;
1313 struct lttng_consumer_channel
*channel
=
1314 consumer_find_channel(key
);
1317 pthread_mutex_lock(&channel
->lock
);
1318 ret_code
= lttng_consumer_open_channel_packets(channel
);
1319 pthread_mutex_unlock(&channel
->lock
);
1321 WARN("Channel %" PRIu64
" not found", key
);
1322 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
1325 health_code_update();
1326 goto end_msg_sessiond
;
1334 * Return 1 to indicate success since the 0 value can be a socket
1335 * shutdown during the recv() or send() call.
1340 /* This will issue a consumer stop. */
1345 * The returned value here is not useful since either way we'll return 1 to
1346 * the caller because the session daemon socket management is done
1347 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1349 ret
= consumer_send_status_msg(sock
, ret_code
);
1355 health_code_update();
1361 * Sync metadata meaning request them to the session daemon and snapshot to the
1362 * metadata thread can consumer them.
1364 * Metadata stream lock MUST be acquired.
1366 enum sync_metadata_status
lttng_kconsumer_sync_metadata(
1367 struct lttng_consumer_stream
*metadata
)
1370 enum sync_metadata_status status
;
1374 ret
= kernctl_buffer_flush(metadata
->wait_fd
);
1376 ERR("Failed to flush kernel stream");
1377 status
= SYNC_METADATA_STATUS_ERROR
;
1381 ret
= kernctl_snapshot(metadata
->wait_fd
);
1383 if (errno
== EAGAIN
) {
1384 /* No new metadata, exit. */
1385 DBG("Sync metadata, no new kernel metadata");
1386 status
= SYNC_METADATA_STATUS_NO_DATA
;
1388 ERR("Sync metadata, taking kernel snapshot failed.");
1389 status
= SYNC_METADATA_STATUS_ERROR
;
1392 status
= SYNC_METADATA_STATUS_NEW_DATA
;
1400 int extract_common_subbuffer_info(struct lttng_consumer_stream
*stream
,
1401 struct stream_subbuffer
*subbuf
)
1405 ret
= kernctl_get_subbuf_size(
1406 stream
->wait_fd
, &subbuf
->info
.data
.subbuf_size
);
1411 ret
= kernctl_get_padded_subbuf_size(
1412 stream
->wait_fd
, &subbuf
->info
.data
.padded_subbuf_size
);
1422 int extract_metadata_subbuffer_info(struct lttng_consumer_stream
*stream
,
1423 struct stream_subbuffer
*subbuf
)
1427 ret
= extract_common_subbuffer_info(stream
, subbuf
);
1432 ret
= kernctl_get_metadata_version(
1433 stream
->wait_fd
, &subbuf
->info
.metadata
.version
);
1443 int extract_data_subbuffer_info(struct lttng_consumer_stream
*stream
,
1444 struct stream_subbuffer
*subbuf
)
1448 ret
= extract_common_subbuffer_info(stream
, subbuf
);
1453 ret
= kernctl_get_packet_size(
1454 stream
->wait_fd
, &subbuf
->info
.data
.packet_size
);
1456 PERROR("Failed to get sub-buffer packet size");
1460 ret
= kernctl_get_content_size(
1461 stream
->wait_fd
, &subbuf
->info
.data
.content_size
);
1463 PERROR("Failed to get sub-buffer content size");
1467 ret
= kernctl_get_timestamp_begin(
1468 stream
->wait_fd
, &subbuf
->info
.data
.timestamp_begin
);
1470 PERROR("Failed to get sub-buffer begin timestamp");
1474 ret
= kernctl_get_timestamp_end(
1475 stream
->wait_fd
, &subbuf
->info
.data
.timestamp_end
);
1477 PERROR("Failed to get sub-buffer end timestamp");
1481 ret
= kernctl_get_events_discarded(
1482 stream
->wait_fd
, &subbuf
->info
.data
.events_discarded
);
1484 PERROR("Failed to get sub-buffer events discarded count");
1488 ret
= kernctl_get_sequence_number(stream
->wait_fd
,
1489 &subbuf
->info
.data
.sequence_number
.value
);
1491 /* May not be supported by older LTTng-modules. */
1492 if (ret
!= -ENOTTY
) {
1493 PERROR("Failed to get sub-buffer sequence number");
1497 subbuf
->info
.data
.sequence_number
.is_set
= true;
1500 ret
= kernctl_get_stream_id(
1501 stream
->wait_fd
, &subbuf
->info
.data
.stream_id
);
1503 PERROR("Failed to get stream id");
1507 ret
= kernctl_get_instance_id(stream
->wait_fd
,
1508 &subbuf
->info
.data
.stream_instance_id
.value
);
1510 /* May not be supported by older LTTng-modules. */
1511 if (ret
!= -ENOTTY
) {
1512 PERROR("Failed to get stream instance id");
1516 subbuf
->info
.data
.stream_instance_id
.is_set
= true;
1523 int get_subbuffer_common(struct lttng_consumer_stream
*stream
,
1524 struct stream_subbuffer
*subbuffer
)
1528 ret
= kernctl_get_next_subbuf(stream
->wait_fd
);
1531 * The caller only expects -ENODATA when there is no data to
1532 * read, but the kernel tracer returns -EAGAIN when there is
1533 * currently no data for a non-finalized stream, and -ENODATA
1534 * when there is no data for a finalized stream. Those can be
1535 * combined into a -ENODATA return value.
1537 if (ret
== -EAGAIN
) {
1544 ret
= stream
->read_subbuffer_ops
.extract_subbuffer_info(
1551 int get_next_subbuffer_splice(struct lttng_consumer_stream
*stream
,
1552 struct stream_subbuffer
*subbuffer
)
1556 ret
= get_subbuffer_common(stream
, subbuffer
);
1561 subbuffer
->buffer
.fd
= stream
->wait_fd
;
1567 int get_next_subbuffer_mmap(struct lttng_consumer_stream
*stream
,
1568 struct stream_subbuffer
*subbuffer
)
1573 ret
= get_subbuffer_common(stream
, subbuffer
);
1578 ret
= get_current_subbuf_addr(stream
, &addr
);
1583 subbuffer
->buffer
.buffer
= lttng_buffer_view_init(
1584 addr
, 0, subbuffer
->info
.data
.padded_subbuf_size
);
1590 int get_next_subbuffer_metadata_check(struct lttng_consumer_stream
*stream
,
1591 struct stream_subbuffer
*subbuffer
)
1597 ret
= kernctl_get_next_subbuf_metadata_check(stream
->wait_fd
,
1603 ret
= stream
->read_subbuffer_ops
.extract_subbuffer_info(
1609 LTTNG_OPTIONAL_SET(&subbuffer
->info
.metadata
.coherent
, coherent
);
1611 ret
= get_current_subbuf_addr(stream
, &addr
);
1616 subbuffer
->buffer
.buffer
= lttng_buffer_view_init(
1617 addr
, 0, subbuffer
->info
.data
.padded_subbuf_size
);
1618 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1619 subbuffer
->info
.metadata
.padded_subbuf_size
,
1620 coherent
? "true" : "false");
1623 * The caller only expects -ENODATA when there is no data to read, but
1624 * the kernel tracer returns -EAGAIN when there is currently no data
1625 * for a non-finalized stream, and -ENODATA when there is no data for a
1626 * finalized stream. Those can be combined into a -ENODATA return value.
1628 if (ret
== -EAGAIN
) {
1636 int put_next_subbuffer(struct lttng_consumer_stream
*stream
,
1637 struct stream_subbuffer
*subbuffer
)
1639 const int ret
= kernctl_put_next_subbuf(stream
->wait_fd
);
1642 if (ret
== -EFAULT
) {
1643 PERROR("Error in unreserving sub buffer");
1644 } else if (ret
== -EIO
) {
1645 /* Should never happen with newer LTTng versions */
1646 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
1654 bool is_get_next_check_metadata_available(int tracer_fd
)
1656 const int ret
= kernctl_get_next_subbuf_metadata_check(tracer_fd
, NULL
);
1657 const bool available
= ret
!= -ENOTTY
;
1660 /* get succeeded, make sure to put the subbuffer. */
1661 kernctl_put_subbuf(tracer_fd
);
1668 int signal_metadata(struct lttng_consumer_stream
*stream
,
1669 struct lttng_consumer_local_data
*ctx
)
1671 ASSERT_LOCKED(stream
->metadata_rdv_lock
);
1672 return pthread_cond_broadcast(&stream
->metadata_rdv
) ? -errno
: 0;
1676 int lttng_kconsumer_set_stream_ops(
1677 struct lttng_consumer_stream
*stream
)
1681 if (stream
->metadata_flag
&& stream
->chan
->is_live
) {
1682 DBG("Attempting to enable metadata bucketization for live consumers");
1683 if (is_get_next_check_metadata_available(stream
->wait_fd
)) {
1684 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1685 stream
->read_subbuffer_ops
.get_next_subbuffer
=
1686 get_next_subbuffer_metadata_check
;
1687 ret
= consumer_stream_enable_metadata_bucketization(
1694 * The kernel tracer version is too old to indicate
1695 * when the metadata stream has reached a "coherent"
1696 * (parseable) point.
1698 * This means that a live viewer may see an incoherent
1699 * sequence of metadata and fail to parse it.
1701 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1702 metadata_bucket_destroy(stream
->metadata_bucket
);
1703 stream
->metadata_bucket
= NULL
;
1706 stream
->read_subbuffer_ops
.on_sleep
= signal_metadata
;
1709 if (!stream
->read_subbuffer_ops
.get_next_subbuffer
) {
1710 if (stream
->chan
->output
== CONSUMER_CHANNEL_MMAP
) {
1711 stream
->read_subbuffer_ops
.get_next_subbuffer
=
1712 get_next_subbuffer_mmap
;
1714 stream
->read_subbuffer_ops
.get_next_subbuffer
=
1715 get_next_subbuffer_splice
;
1719 if (stream
->metadata_flag
) {
1720 stream
->read_subbuffer_ops
.extract_subbuffer_info
=
1721 extract_metadata_subbuffer_info
;
1723 stream
->read_subbuffer_ops
.extract_subbuffer_info
=
1724 extract_data_subbuffer_info
;
1725 if (stream
->chan
->is_live
) {
1726 stream
->read_subbuffer_ops
.send_live_beacon
=
1727 consumer_flush_kernel_index
;
1731 stream
->read_subbuffer_ops
.put_next_subbuffer
= put_next_subbuffer
;
1736 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream
*stream
)
1743 * Don't create anything if this is set for streaming or if there is
1744 * no current trace chunk on the parent channel.
1746 if (stream
->net_seq_idx
== (uint64_t) -1ULL && stream
->chan
->monitor
&&
1747 stream
->chan
->trace_chunk
) {
1748 ret
= consumer_stream_create_output_files(stream
, true);
1754 if (stream
->output
== LTTNG_EVENT_MMAP
) {
1755 /* get the len of the mmap region */
1756 unsigned long mmap_len
;
1758 ret
= kernctl_get_mmap_len(stream
->wait_fd
, &mmap_len
);
1760 PERROR("kernctl_get_mmap_len");
1761 goto error_close_fd
;
1763 stream
->mmap_len
= (size_t) mmap_len
;
1765 stream
->mmap_base
= mmap(NULL
, stream
->mmap_len
, PROT_READ
,
1766 MAP_PRIVATE
, stream
->wait_fd
, 0);
1767 if (stream
->mmap_base
== MAP_FAILED
) {
1768 PERROR("Error mmaping");
1770 goto error_close_fd
;
1774 ret
= lttng_kconsumer_set_stream_ops(stream
);
1776 goto error_close_fd
;
1779 /* we return 0 to let the library handle the FD internally */
1783 if (stream
->out_fd
>= 0) {
1786 err
= close(stream
->out_fd
);
1788 stream
->out_fd
= -1;
1795 * Check if data is still being extracted from the buffers for a specific
1796 * stream. Consumer data lock MUST be acquired before calling this function
1797 * and the stream lock.
1799 * Return 1 if the traced data are still getting read else 0 meaning that the
1800 * data is available for trace viewer reading.
1802 int lttng_kconsumer_data_pending(struct lttng_consumer_stream
*stream
)
1808 if (stream
->endpoint_status
!= CONSUMER_ENDPOINT_ACTIVE
) {
1813 ret
= kernctl_get_next_subbuf(stream
->wait_fd
);
1815 /* There is still data so let's put back this subbuffer. */
1816 ret
= kernctl_put_subbuf(stream
->wait_fd
);
1818 ret
= 1; /* Data is pending */
1822 /* Data is NOT pending and ready to be read. */