X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=6fe91c2371f16d51836cce898a4eab4bc0cd8ff3;hb=47287c8c2ec367b9718dc1eb0f5aef4f492637af;hp=1e4b9c92a8813746a3e988a18b7c49d6c22beb8c;hpb=514775d9bca89b3bd072c58e779201682304c57d;p=lttng-tools.git diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index 1e4b9c92a..6fe91c237 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 EfficiOS Inc. * Copyright (C) 2011 Mathieu Desnoyers * Copyright (C) 2013 David Goulet * @@ -14,15 +14,19 @@ #include #include +#include +#include +#include +#include +#include +#include #include #include +#include +#include #include #include #include -#include -#include -#include -#include #include "consumer-stream.h" @@ -52,6 +56,12 @@ static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream pthread_mutex_unlock(&stream->chan->lock); } +static void consumer_stream_data_assert_locked_all(struct lttng_consumer_stream *stream) +{ + ASSERT_LOCKED(stream->lock); + ASSERT_LOCKED(stream->chan->lock); +} + static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream) { consumer_stream_data_lock_all(stream); @@ -64,6 +74,12 @@ static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *st consumer_stream_data_unlock_all(stream); } +static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_stream *stream) +{ + ASSERT_LOCKED(stream->metadata_rdv_lock); + consumer_stream_data_assert_locked_all(stream); +} + /* Only used for data streams. */ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuf) @@ -369,7 +385,7 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, assert(ctx); /* Ease our life a bit. */ - ht = consumer_data.stream_list_ht; + ht = the_consumer_data.stream_list_ht; rcu_read_lock(); @@ -404,6 +420,7 @@ static int consumer_stream_sync_metadata_index( const struct stream_subbuffer *subbuffer, struct lttng_consumer_local_data *ctx) { + bool missed_metadata_flush; int ret; /* Block until all the metadata is sent. */ @@ -416,18 +433,34 @@ static int consumer_stream_sync_metadata_index( pthread_mutex_lock(&stream->metadata_timer_lock); stream->waiting_on_metadata = false; - if (stream->missed_metadata_flush) { + missed_metadata_flush = stream->missed_metadata_flush; + if (missed_metadata_flush) { stream->missed_metadata_flush = false; - pthread_mutex_unlock(&stream->metadata_timer_lock); - (void) stream->read_subbuffer_ops.send_live_beacon(stream); - } else { - pthread_mutex_unlock(&stream->metadata_timer_lock); } + pthread_mutex_unlock(&stream->metadata_timer_lock); if (ret < 0) { goto end; } ret = consumer_stream_send_index(stream, subbuffer, ctx); + /* + * Send the live inactivity beacon to handle the situation where + * the live timer is prevented from sampling this stream + * because the stream lock was being held while this stream is + * waiting on metadata. This ensures live viewer progress in the + * unlikely scenario where a live timer would be prevented from + * locking a stream lock repeatedly due to a steady flow of + * incoming metadata, for a stream which is mostly inactive. + * + * It is important to send the inactivity beacon packet to + * relayd _after_ sending the index associated with the data + * that was just sent, otherwise this can cause live viewers to + * observe timestamps going backwards between an inactivity + * beacon and a following trace packet. + */ + if (missed_metadata_flush) { + (void) stream->read_subbuffer_ops.send_live_beacon(stream); + } end: return ret; } @@ -629,13 +662,14 @@ struct lttng_consumer_stream *consumer_stream_create( goto end; } + rcu_read_lock(); + if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) { ERR("Failed to acquire trace chunk reference during the creation of a stream"); ret = -1; goto error; } - rcu_read_lock(); stream->chan = channel; stream->key = stream_key; stream->trace_chunk = trace_chunk; @@ -711,6 +745,8 @@ struct lttng_consumer_stream *consumer_stream_create( consumer_stream_metadata_lock_all; stream->read_subbuffer_ops.unlock = consumer_stream_metadata_unlock_all; + stream->read_subbuffer_ops.assert_locked = + consumer_stream_metadata_assert_locked_all; stream->read_subbuffer_ops.pre_consume_subbuffer = metadata_stream_check_version; } else { @@ -737,6 +773,8 @@ struct lttng_consumer_stream *consumer_stream_create( stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all; stream->read_subbuffer_ops.unlock = consumer_stream_data_unlock_all; + stream->read_subbuffer_ops.assert_locked = + consumer_stream_data_assert_locked_all; stream->read_subbuffer_ops.pre_consume_subbuffer = consumer_stream_update_stats; } @@ -817,7 +855,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) assert(stream); - switch (consumer_data.type) { + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: if (stream->mmap_base != NULL) { ret = munmap(stream->mmap_base, stream->mmap_len); @@ -925,19 +963,19 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, * that did not add the stream to a (all) hash table. Same goes for the * next call ht del call. */ - (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter); + (void) lttng_ht_del(the_consumer_data.stream_per_chan_id_ht, &iter); /* Delete from the global stream list. */ iter.iter.node = &stream->node_session_id.node; /* See the previous ht del on why we ignore the returned value. */ - (void) lttng_ht_del(consumer_data.stream_list_ht, &iter); + (void) lttng_ht_del(the_consumer_data.stream_list_ht, &iter); rcu_read_unlock(); if (!stream->metadata_flag) { /* Decrement the stream count of the global consumer data. */ - assert(consumer_data.stream_count > 0); - consumer_data.stream_count--; + assert(the_consumer_data.stream_count > 0); + the_consumer_data.stream_count--; } } @@ -959,7 +997,7 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) { assert(stream); - switch (consumer_data.type) { + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; case LTTNG_CONSUMER32_UST: @@ -1031,7 +1069,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, * stream thus being globally visible. */ if (stream->globally_visible) { - pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&the_consumer_data.lock); pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); /* Remove every reference of the stream in the consumer. */ @@ -1043,16 +1081,17 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, free_chan = unref_channel(stream); /* Indicates that the consumer data state MUST be updated after this. */ - consumer_data.need_update = 1; + the_consumer_data.need_update = 1; pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); - pthread_mutex_unlock(&consumer_data.lock); + pthread_mutex_unlock(&the_consumer_data.lock); } else { /* * If the stream is not visible globally, this needs to be done * outside of the consumer data lock section. */ + destroy_close_stream(stream); free_chan = unref_channel(stream); } @@ -1150,16 +1189,16 @@ int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, goto end; } stream->out_fd = -1; - } + } DBG("Opening stream output file \"%s\"", stream_path); chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path, flags, mode, &stream->out_fd, false); - if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to open stream file \"%s\"", stream->name); ret = -1; goto end; - } + } if (!stream->metadata_flag && (create_index || stream->index_file)) { if (stream->index_file) { @@ -1289,7 +1328,7 @@ int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, { int ret = 0; - switch (consumer_data.type) { + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: if (producer_active) { ret = kernctl_buffer_flush(stream->wait_fd); @@ -1317,7 +1356,7 @@ int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - lttng_ustconsumer_flush_buffer(stream, (int) producer_active); + ret = lttng_ustconsumer_flush_buffer(stream, (int) producer_active); break; default: ERR("Unknown consumer_data type");