Fix: consumer: Move sanity check within `consumer_subbuffer` functions
[lttng-tools.git] / src / common / consumer / consumer-stream.c
index b262b54d56868c27d2704b7cadeb52880c3ec40e..47f2eb2171fa2883d84ce8a7dfe8830fa10134fb 100644 (file)
@@ -69,8 +69,7 @@ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
 {
        int ret = 0;
        uint64_t sequence_number;
-       const uint64_t discarded_events =
-                       LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number);
+       const uint64_t discarded_events = subbuf->info.data.events_discarded;
 
        if (!subbuf->info.data.sequence_number.is_set) {
                /* Command not supported by the tracer. */
@@ -152,9 +151,40 @@ static ssize_t consumer_stream_consume_mmap(
        const unsigned long padding_size =
                        subbuffer->info.data.padded_subbuf_size -
                        subbuffer->info.data.subbuf_size;
-
-       return lttng_consumer_on_read_subbuffer_mmap(
+       const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap(
                        stream, &subbuffer->buffer.buffer, padding_size);
+
+       if (stream->net_seq_idx == -1ULL) {
+               /*
+                * When writing on disk, check that only the subbuffer (no
+                * padding) was written to disk.
+                */
+               if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+                       DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)",
+                                       written_bytes,
+                                       subbuffer->info.data.padded_subbuf_size);
+               }
+       } else {
+               /*
+                * When streaming over the network, check that the entire
+                * subbuffer including padding was successfully written.
+                */
+               if (written_bytes != subbuffer->info.data.subbuf_size) {
+                       DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)",
+                                       written_bytes,
+                                       subbuffer->info.data.subbuf_size);
+               }
+       }
+
+       /*
+        * If `lttng_consumer_on_read_subbuffer_mmap()` returned an error, pass
+        * it along to the caller, else return zero.
+        */
+       if (written_bytes < 0) {
+               ERR("Error reading mmap subbuffer: %zd", written_bytes);
+       }
+
+       return written_bytes;
 }
 
 static ssize_t consumer_stream_consume_splice(
@@ -162,8 +192,24 @@ static ssize_t consumer_stream_consume_splice(
                struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuffer)
 {
-       return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
-                       subbuffer->info.data.padded_subbuf_size, 0);
+       const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice(
+                       ctx, stream, subbuffer->info.data.padded_subbuf_size, 0);
+
+       if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+               DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)",
+                               written_bytes,
+                               subbuffer->info.data.padded_subbuf_size);
+       }
+
+       /*
+        * If `lttng_consumer_on_read_subbuffer_splice()` returned an error,
+        * pass it along to the caller, else return zero.
+        */
+       if (written_bytes < 0) {
+               ERR("Error reading splice subbuffer: %zd", written_bytes);
+       }
+
+       return written_bytes;
 }
 
 static int consumer_stream_send_index(
@@ -197,6 +243,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
+       enum sync_metadata_status status;
 
        assert(metadata);
        assert(metadata->metadata_flag);
@@ -244,7 +291,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                        /*
                         * Empty the metadata cache and flush the current stream.
                         */
-                       ret = lttng_kconsumer_sync_metadata(metadata);
+                       status = lttng_kconsumer_sync_metadata(metadata);
                        break;
                case LTTNG_CONSUMER32_UST:
                case LTTNG_CONSUMER64_UST:
@@ -252,18 +299,23 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                         * Ask the sessiond if we have new metadata waiting and update the
                         * consumer metadata cache.
                         */
-                       ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
+                       status = lttng_ustconsumer_sync_metadata(ctx, metadata);
                        break;
                default:
-                       assert(0);
-                       ret = -1;
-                       break;
+                       abort();
                }
-               /*
-                * Error or no new metadata, we exit here.
-                */
-               if (ret <= 0 || ret == ENODATA) {
+
+               switch (status) {
+               case SYNC_METADATA_STATUS_NEW_DATA:
+                       break;
+               case SYNC_METADATA_STATUS_NO_DATA:
+                       ret = 0;
+                       goto end_unlock_mutex;
+               case SYNC_METADATA_STATUS_ERROR:
+                       ret = -1;
                        goto end_unlock_mutex;
+               default:
+                       abort();
                }
 
                /*
@@ -285,7 +337,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                 */
                pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
                pthread_mutex_unlock(&metadata->metadata_rdv_lock);
-       } while (ret == EAGAIN);
+       } while (status == SYNC_METADATA_STATUS_NEW_DATA);
 
        /* Success */
        return 0;
@@ -447,6 +499,8 @@ struct lttng_consumer_stream *consumer_stream_create(
        stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
        stream->rotate_position = -1ULL;
+       /* Buffer is created with an open packet. */
+       stream->opened_packet_in_current_trace_chunk = true;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
This page took 0.02555 seconds and 4 git commands to generate.