Copyright ownership transfer
[lttng-tools.git] / src / common / consumer / consumer-stream.c
index 8c21d6ae1edd3afe71967d6bf5e5cb32cb998675..a23322c8d84b584dd1c78f2d282063dd4d2f1a64 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
  * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
  *
 #include <unistd.h>
 
 #include <common/common.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer.h>
+#include <common/consumer/consumer.h>
+#include <common/consumer/metadata-bucket.h>
+#include <common/consumer/metadata-bucket.h>
 #include <common/index/index.h>
 #include <common/kernel-consumer/kernel-consumer.h>
+#include <common/kernel-ctl/kernel-ctl.h>
+#include <common/macros.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
 #include <common/utils.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/metadata-bucket.h>
-#include <common/kernel-ctl/kernel-ctl.h>
 
 #include "consumer-stream.h"
 
@@ -52,6 +56,12 @@ static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream
        pthread_mutex_unlock(&stream->chan->lock);
 }
 
+static void consumer_stream_data_assert_locked_all(struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->lock);
+       ASSERT_LOCKED(stream->chan->lock);
+}
+
 static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
 {
        consumer_stream_data_lock_all(stream);
@@ -64,6 +74,12 @@ static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *st
        consumer_stream_data_unlock_all(stream);
 }
 
+static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->metadata_rdv_lock);
+       consumer_stream_data_assert_locked_all(stream);
+}
+
 /* Only used for data streams. */
 static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuf)
@@ -152,9 +168,40 @@ static ssize_t consumer_stream_consume_mmap(
        const unsigned long padding_size =
                        subbuffer->info.data.padded_subbuf_size -
                        subbuffer->info.data.subbuf_size;
-
-       return lttng_consumer_on_read_subbuffer_mmap(
+       const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap(
                        stream, &subbuffer->buffer.buffer, padding_size);
+
+       if (stream->net_seq_idx == -1ULL) {
+               /*
+                * When writing on disk, check that only the subbuffer (no
+                * padding) was written to disk.
+                */
+               if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+                       DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)",
+                                       written_bytes,
+                                       subbuffer->info.data.padded_subbuf_size);
+               }
+       } else {
+               /*
+                * When streaming over the network, check that the entire
+                * subbuffer including padding was successfully written.
+                */
+               if (written_bytes != subbuffer->info.data.subbuf_size) {
+                       DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)",
+                                       written_bytes,
+                                       subbuffer->info.data.subbuf_size);
+               }
+       }
+
+       /*
+        * If `lttng_consumer_on_read_subbuffer_mmap()` returned an error, pass
+        * it along to the caller, else return zero.
+        */
+       if (written_bytes < 0) {
+               ERR("Error reading mmap subbuffer: %zd", written_bytes);
+       }
+
+       return written_bytes;
 }
 
 static ssize_t consumer_stream_consume_splice(
@@ -162,8 +209,24 @@ static ssize_t consumer_stream_consume_splice(
                struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuffer)
 {
-       return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
-                       subbuffer->info.data.padded_subbuf_size, 0);
+       const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice(
+                       ctx, stream, subbuffer->info.data.padded_subbuf_size, 0);
+
+       if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+               DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)",
+                               written_bytes,
+                               subbuffer->info.data.padded_subbuf_size);
+       }
+
+       /*
+        * If `lttng_consumer_on_read_subbuffer_splice()` returned an error,
+        * pass it along to the caller, else return zero.
+        */
+       if (written_bytes < 0) {
+               ERR("Error reading splice subbuffer: %zd", written_bytes);
+       }
+
+       return written_bytes;
 }
 
 static int consumer_stream_send_index(
@@ -322,7 +385,7 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
        assert(ctx);
 
        /* Ease our life a bit. */
-       ht = consumer_data.stream_list_ht;
+       ht = the_consumer_data.stream_list_ht;
 
        rcu_read_lock();
 
@@ -357,6 +420,7 @@ static int consumer_stream_sync_metadata_index(
                const struct stream_subbuffer *subbuffer,
                struct lttng_consumer_local_data *ctx)
 {
+       bool missed_metadata_flush;
        int ret;
 
        /* Block until all the metadata is sent. */
@@ -369,18 +433,34 @@ static int consumer_stream_sync_metadata_index(
 
        pthread_mutex_lock(&stream->metadata_timer_lock);
        stream->waiting_on_metadata = false;
-       if (stream->missed_metadata_flush) {
+       missed_metadata_flush = stream->missed_metadata_flush;
+       if (missed_metadata_flush) {
                stream->missed_metadata_flush = false;
-               pthread_mutex_unlock(&stream->metadata_timer_lock);
-               (void) stream->read_subbuffer_ops.send_live_beacon(stream);
-       } else {
-               pthread_mutex_unlock(&stream->metadata_timer_lock);
        }
+       pthread_mutex_unlock(&stream->metadata_timer_lock);
        if (ret < 0) {
                goto end;
        }
 
        ret = consumer_stream_send_index(stream, subbuffer, ctx);
+       /*
+        * Send the live inactivity beacon to handle the situation where
+        * the live timer is prevented from sampling this stream
+        * because the stream lock was being held while this stream is
+        * waiting on metadata. This ensures live viewer progress in the
+        * unlikely scenario where a live timer would be prevented from
+        * locking a stream lock repeatedly due to a steady flow of
+        * incoming metadata, for a stream which is mostly inactive.
+        *
+        * It is important to send the inactivity beacon packet to
+        * relayd _after_ sending the index associated with the data
+        * that was just sent, otherwise this can cause live viewers to
+        * observe timestamps going backwards between an inactivity
+        * beacon and a following trace packet.
+        */
+       if (missed_metadata_flush) {
+               (void) stream->read_subbuffer_ops.send_live_beacon(stream);
+       }
 end:
        return ret;
 }
@@ -582,13 +662,14 @@ struct lttng_consumer_stream *consumer_stream_create(
                goto end;
        }
 
+       rcu_read_lock();
+
        if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
                ERR("Failed to acquire trace chunk reference during the creation of a stream");
                ret = -1;
                goto error;
        }
 
-       rcu_read_lock();
        stream->chan = channel;
        stream->key = stream_key;
        stream->trace_chunk = trace_chunk;
@@ -664,6 +745,8 @@ struct lttng_consumer_stream *consumer_stream_create(
                                consumer_stream_metadata_lock_all;
                stream->read_subbuffer_ops.unlock =
                                consumer_stream_metadata_unlock_all;
+               stream->read_subbuffer_ops.assert_locked =
+                               consumer_stream_metadata_assert_locked_all;
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                metadata_stream_check_version;
        } else {
@@ -690,6 +773,8 @@ struct lttng_consumer_stream *consumer_stream_create(
                stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
                stream->read_subbuffer_ops.unlock =
                                consumer_stream_data_unlock_all;
+               stream->read_subbuffer_ops.assert_locked =
+                               consumer_stream_data_assert_locked_all;
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                consumer_stream_update_stats;
        }
@@ -770,7 +855,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
 
        assert(stream);
 
-       switch (consumer_data.type) {
+       switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                if (stream->mmap_base != NULL) {
                        ret = munmap(stream->mmap_base, stream->mmap_len);
@@ -878,19 +963,19 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream,
         * that did not add the stream to a (all) hash table. Same goes for the
         * next call ht del call.
         */
-       (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
+       (void) lttng_ht_del(the_consumer_data.stream_per_chan_id_ht, &iter);
 
        /* Delete from the global stream list. */
        iter.iter.node = &stream->node_session_id.node;
        /* See the previous ht del on why we ignore the returned value. */
-       (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
+       (void) lttng_ht_del(the_consumer_data.stream_list_ht, &iter);
 
        rcu_read_unlock();
 
        if (!stream->metadata_flag) {
                /* Decrement the stream count of the global consumer data. */
-               assert(consumer_data.stream_count > 0);
-               consumer_data.stream_count--;
+               assert(the_consumer_data.stream_count > 0);
+               the_consumer_data.stream_count--;
        }
 }
 
@@ -912,7 +997,7 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
 {
        assert(stream);
 
-       switch (consumer_data.type) {
+       switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
        case LTTNG_CONSUMER32_UST:
@@ -984,7 +1069,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                 * stream thus being globally visible.
                 */
                if (stream->globally_visible) {
-                       pthread_mutex_lock(&consumer_data.lock);
+                       pthread_mutex_lock(&the_consumer_data.lock);
                        pthread_mutex_lock(&stream->chan->lock);
                        pthread_mutex_lock(&stream->lock);
                        /* Remove every reference of the stream in the consumer. */
@@ -996,11 +1081,11 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                        free_chan = unref_channel(stream);
 
                        /* Indicates that the consumer data state MUST be updated after this. */
-                       consumer_data.need_update = 1;
+                       the_consumer_data.need_update = 1;
 
                        pthread_mutex_unlock(&stream->lock);
                        pthread_mutex_unlock(&stream->chan->lock);
-                       pthread_mutex_unlock(&consumer_data.lock);
+                       pthread_mutex_unlock(&the_consumer_data.lock);
                } else {
                        /*
                         * If the stream is not visible globally, this needs to be done
@@ -1103,16 +1188,16 @@ int consumer_stream_create_output_files(struct lttng_consumer_stream *stream,
                        goto end;
                }
                stream->out_fd = -1;
-        }
+       }
 
        DBG("Opening stream output file \"%s\"", stream_path);
        chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
                        flags, mode, &stream->out_fd, false);
-        if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ERR("Failed to open stream file \"%s\"", stream->name);
                ret = -1;
                goto end;
-        }
+       }
 
        if (!stream->metadata_flag && (create_index || stream->index_file)) {
                if (stream->index_file) {
@@ -1242,7 +1327,7 @@ int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream,
 {
        int ret = 0;
 
-       switch (consumer_data.type) {
+       switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                if (producer_active) {
                        ret = kernctl_buffer_flush(stream->wait_fd);
@@ -1270,7 +1355,7 @@ int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream,
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_flush_buffer(stream, (int) producer_active);
+               ret = lttng_ustconsumer_flush_buffer(stream, (int) producer_active);
                break;
        default:
                ERR("Unknown consumer_data type");
This page took 0.051653 seconds and 4 git commands to generate.