+ subbuffer->buffer.buffer = lttng_buffer_view_init(
+ addr, 0, subbuffer->info.data.padded_subbuf_size);
+ DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
+ subbuffer->info.metadata.padded_subbuf_size,
+ coherent ? "true" : "false");
+end:
+ /*
+ * The caller only expects -ENODATA when there is no data to read, but
+ * the kernel tracer returns -EAGAIN when there is currently no data
+ * for a non-finalized stream, and -ENODATA when there is no data for a
+ * finalized stream. Those can be combined into a -ENODATA return value.
+ */
+ if (ret == -EAGAIN) {
+ ret = -ENODATA;
+ }
+
+ return ret;
+}
+
+static
+int put_next_subbuffer(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
+{
+ const int ret = kernctl_put_next_subbuf(stream->wait_fd);
+
+ if (ret) {
+ if (ret == -EFAULT) {
+ PERROR("Error in unreserving sub buffer");
+ } else if (ret == -EIO) {
+ /* Should never happen with newer LTTng versions */
+ PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
+ }
+ }
+
+ return ret;
+}
+
+static
+bool is_get_next_check_metadata_available(int tracer_fd)
+{
+ const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
+ const bool available = ret != -ENOTTY;
+
+ if (ret == 0) {
+ /* get succeeded, make sure to put the subbuffer. */
+ kernctl_put_subbuf(tracer_fd);
+ }
+
+ return available;
+}
+
+static
+int signal_metadata(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ ASSERT_LOCKED(stream->metadata_rdv_lock);
+ return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
+}
+
+static
+int lttng_kconsumer_set_stream_ops(
+ struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+
+ if (stream->metadata_flag && stream->chan->is_live) {
+ DBG("Attempting to enable metadata bucketization for live consumers");
+ if (is_get_next_check_metadata_available(stream->wait_fd)) {
+ DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_metadata_check;
+ ret = consumer_stream_enable_metadata_bucketization(
+ stream);
+ if (ret) {
+ goto end;
+ }
+ } else {
+ /*
+ * The kernel tracer version is too old to indicate
+ * when the metadata stream has reached a "coherent"
+ * (parseable) point.
+ *
+ * This means that a live viewer may see an incoherent
+ * sequence of metadata and fail to parse it.
+ */
+ WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
+ metadata_bucket_destroy(stream->metadata_bucket);
+ stream->metadata_bucket = NULL;
+ }
+
+ stream->read_subbuffer_ops.on_sleep = signal_metadata;
+ }
+
+ if (!stream->read_subbuffer_ops.get_next_subbuffer) {
+ if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_mmap;
+ } else {
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_splice;
+ }
+ }
+
+ if (stream->metadata_flag) {
+ stream->read_subbuffer_ops.extract_subbuffer_info =
+ extract_metadata_subbuffer_info;
+ } else {
+ stream->read_subbuffer_ops.extract_subbuffer_info =
+ extract_data_subbuffer_info;
+ if (stream->chan->is_live) {
+ stream->read_subbuffer_ops.send_live_beacon =
+ consumer_flush_kernel_index;
+ }
+ }
+
+ stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;