X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=47f2eb2171fa2883d84ce8a7dfe8830fa10134fb;hb=56c86e0a3f16a2f7fd3e473dd8c9d753a13e86ee;hp=b262b54d56868c27d2704b7cadeb52880c3ec40e;hpb=3a2dcb5e6b59f5985ce114f5cec631a4ec60b9eb;p=lttng-tools.git diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index b262b54d5..47f2eb217 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -69,8 +69,7 @@ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, { int ret = 0; uint64_t sequence_number; - const uint64_t discarded_events = - LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number); + const uint64_t discarded_events = subbuf->info.data.events_discarded; if (!subbuf->info.data.sequence_number.is_set) { /* Command not supported by the tracer. */ @@ -152,9 +151,40 @@ static ssize_t consumer_stream_consume_mmap( const unsigned long padding_size = subbuffer->info.data.padded_subbuf_size - subbuffer->info.data.subbuf_size; - - return lttng_consumer_on_read_subbuffer_mmap( + const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap( stream, &subbuffer->buffer.buffer, padding_size); + + if (stream->net_seq_idx == -1ULL) { + /* + * When writing on disk, check that only the subbuffer (no + * padding) was written to disk. + */ + if (written_bytes != subbuffer->info.data.padded_subbuf_size) { + DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)", + written_bytes, + subbuffer->info.data.padded_subbuf_size); + } + } else { + /* + * When streaming over the network, check that the entire + * subbuffer including padding was successfully written. + */ + if (written_bytes != subbuffer->info.data.subbuf_size) { + DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)", + written_bytes, + subbuffer->info.data.subbuf_size); + } + } + + /* + * If `lttng_consumer_on_read_subbuffer_mmap()` returned an error, pass + * it along to the caller, else return zero. + */ + if (written_bytes < 0) { + ERR("Error reading mmap subbuffer: %zd", written_bytes); + } + + return written_bytes; } static ssize_t consumer_stream_consume_splice( @@ -162,8 +192,24 @@ static ssize_t consumer_stream_consume_splice( struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuffer) { - return lttng_consumer_on_read_subbuffer_splice(ctx, stream, - subbuffer->info.data.padded_subbuf_size, 0); + const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice( + ctx, stream, subbuffer->info.data.padded_subbuf_size, 0); + + if (written_bytes != subbuffer->info.data.padded_subbuf_size) { + DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)", + written_bytes, + subbuffer->info.data.padded_subbuf_size); + } + + /* + * If `lttng_consumer_on_read_subbuffer_splice()` returned an error, + * pass it along to the caller, else return zero. + */ + if (written_bytes < 0) { + ERR("Error reading splice subbuffer: %zd", written_bytes); + } + + return written_bytes; } static int consumer_stream_send_index( @@ -197,6 +243,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata, struct lttng_consumer_local_data *ctx) { int ret; + enum sync_metadata_status status; assert(metadata); assert(metadata->metadata_flag); @@ -244,7 +291,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata, /* * Empty the metadata cache and flush the current stream. */ - ret = lttng_kconsumer_sync_metadata(metadata); + status = lttng_kconsumer_sync_metadata(metadata); break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: @@ -252,18 +299,23 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_sync_metadata(ctx, metadata); + status = lttng_ustconsumer_sync_metadata(ctx, metadata); break; default: - assert(0); - ret = -1; - break; + abort(); } - /* - * Error or no new metadata, we exit here. - */ - if (ret <= 0 || ret == ENODATA) { + + switch (status) { + case SYNC_METADATA_STATUS_NEW_DATA: + break; + case SYNC_METADATA_STATUS_NO_DATA: + ret = 0; + goto end_unlock_mutex; + case SYNC_METADATA_STATUS_ERROR: + ret = -1; goto end_unlock_mutex; + default: + abort(); } /* @@ -285,7 +337,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata, */ pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock); pthread_mutex_unlock(&metadata->metadata_rdv_lock); - } while (ret == EAGAIN); + } while (status == SYNC_METADATA_STATUS_NEW_DATA); /* Success */ return 0; @@ -447,6 +499,8 @@ struct lttng_consumer_stream *consumer_stream_create( stream->index_file = NULL; stream->last_sequence_number = -1ULL; stream->rotate_position = -1ULL; + /* Buffer is created with an open packet. */ + stream->opened_packet_in_current_trace_chunk = true; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL);