X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=03bc5fc5e84b2487f85ba417c801ffbd2759c854;hb=8d18bcaede97b6d87e434310da9c13d269983c77;hp=832c44371455de237d8b7e8da77ad2440d0441fe;hpb=6f1177cf8de79015ed38f48749029c81240b0eb6;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 832c44371..03bc5fc5e 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -47,6 +47,7 @@ #include #include #include +#include #include "ust-consumer.h" @@ -1240,7 +1241,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, subbuf_view = lttng_buffer_view_init( subbuf_addr, 0, padded_len); - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + read_len = lttng_consumer_on_read_subbuffer_mmap( stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { @@ -2447,7 +2448,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) assert(write_len != 0); if (write_len < 0) { ERR("Writing one metadata packet"); - ret = -1; + ret = write_len; goto end; } stream->ust_metadata_pushed += write_len; @@ -2793,28 +2794,88 @@ static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) { int ret; + bool cache_empty; + bool got_subbuffer; + bool coherent; + bool buffer_empty; + unsigned long consumed_pos, produced_pos; - ret = ustctl_get_next_subbuf(stream->ustream); - if (ret) { - ret = commit_one_metadata_packet(stream); - if (ret < 0) { - goto end; - } else if (ret == 0) { - /* Not an error, the cache is empty. */ - ret = -ENODATA; - goto end; + do { + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret == 0) { + got_subbuffer = true; + } else { + got_subbuffer = false; + if (ret != -EAGAIN) { + /* Fatal error. */ + goto end; + } } - ret = ustctl_get_next_subbuf(stream->ustream); - if (ret) { - goto end; + /* + * Determine if the cache is empty and ensure that a sub-buffer + * is made available if the cache is not empty. + */ + if (!got_subbuffer) { + ret = commit_one_metadata_packet(stream); + if (ret < 0 && ret != -ENOBUFS) { + goto end; + } else if (ret == 0) { + /* Not an error, the cache is empty. */ + cache_empty = true; + ret = -ENODATA; + goto end; + } else { + cache_empty = false; + } + } else { + pthread_mutex_lock(&stream->chan->metadata_cache->lock); + cache_empty = stream->chan->metadata_cache->max_offset == + stream->ust_metadata_pushed; + pthread_mutex_unlock(&stream->chan->metadata_cache->lock); } - } + } while (!got_subbuffer); + /* Populate sub-buffer infos and view. */ ret = get_next_subbuffer_common(stream, subbuffer); if (ret) { goto end; } + + ret = lttng_ustconsumer_sample_snapshot_positions(stream); + if (ret < 0) { + /* + * -EAGAIN is not expected since we got a sub-buffer and haven't + * pushed the consumption position yet (on put_next). + */ + PERROR("Failed to take a snapshot of metadata buffer positions"); + goto end; + } + + ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret) { + PERROR("Failed to get metadata consumed position"); + goto end; + } + + ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos); + if (ret) { + PERROR("Failed to get metadata produced position"); + goto end; + } + + /* Last sub-buffer of the ring buffer ? */ + buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos; + + /* + * The sessiond registry lock ensures that coherent units of metadata + * are pushed to the consumer daemon at once. Hence, if a sub-buffer is + * acquired, the cache is empty, and it is the only available sub-buffer + * available, it is safe to assume that it is "coherent". + */ + coherent = got_subbuffer && cache_empty && buffer_empty; + + LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent); end: return ret; } @@ -2834,9 +2895,11 @@ static int signal_metadata(struct lttng_consumer_stream *stream, return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; } -static void lttng_ustconsumer_set_stream_ops( +static int lttng_ustconsumer_set_stream_ops( struct lttng_consumer_stream *stream) { + int ret = 0; + stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up; if (stream->metadata_flag) { stream->read_subbuffer_ops.get_next_subbuffer = @@ -2845,7 +2908,14 @@ static void lttng_ustconsumer_set_stream_ops( extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = metadata_stream_reset_cache; - stream->read_subbuffer_ops.on_sleep = signal_metadata; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.on_sleep = signal_metadata; + ret = consumer_stream_enable_metadata_bucketization( + stream); + if (ret) { + goto end; + } + } } else { stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer; @@ -2859,6 +2929,8 @@ static void lttng_ustconsumer_set_stream_ops( } stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; +end: + return ret; } /*