X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-metadata-cache.c;h=a5943fd33ae6f8f858af5c61e3504a70dbceb2e1;hb=95671f5349e87cdd2ea6cb47243608e9368ab8d5;hp=1038b84e63fd58cb3fb2dc49a76a6e1a3fc7cdc8;hpb=fa29bfbf73e837b936d80b4d5a1206dfb8496f07;p=lttng-tools.git diff --git a/src/common/consumer/consumer-metadata-cache.c b/src/common/consumer/consumer-metadata-cache.c index 1038b84e6..a5943fd33 100644 --- a/src/common/consumer/consumer-metadata-cache.c +++ b/src/common/consumer/consumer-metadata-cache.c @@ -188,17 +188,15 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel) /* * Check if the cache is flushed up to the offset passed in parameter. * - * Return 0 if everything has been flushed, 1 if there is data not flushed. + * Return true if everything has been flushed, false if there is data not flushed. */ -int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, +static +bool consumer_metadata_cache_is_flushed(struct lttng_consumer_channel *channel, uint64_t offset, int timer) { - int ret = 0; + bool done_flushing = false; struct lttng_consumer_stream *metadata_stream; - assert(channel); - assert(channel->metadata_cache); - /* * If not called from a timer handler, we have to take the * channel lock to be mutually exclusive with channel teardown. @@ -216,7 +214,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, * Having no metadata stream means the channel is being destroyed so there * is no cache to flush anymore. */ - ret = 0; + done_flushing = true; goto end_unlock_channel; } @@ -224,23 +222,56 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, pthread_mutex_lock(&channel->metadata_cache->lock); if (metadata_stream->ust_metadata_pushed >= offset) { - ret = 0; + done_flushing = true; } else if (channel->metadata_stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { /* An inactive endpoint means we don't have to flush anymore. */ - ret = 0; + done_flushing = true; } else { /* Still not completely flushed. */ - ret = 1; + done_flushing = false; } pthread_mutex_unlock(&channel->metadata_cache->lock); pthread_mutex_unlock(&metadata_stream->lock); + end_unlock_channel: pthread_mutex_unlock(&channel->timer_lock); if (!timer) { pthread_mutex_unlock(&channel->lock); } - return ret; + return done_flushing; +} + +/* + * Wait until the cache is flushed up to the offset passed in parameter or the + * metadata stream has been destroyed. + */ +void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel, + uint64_t offset, bool invoked_by_timer) +{ + assert(channel); + assert(channel->metadata_cache); + + if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { + return; + } + + /* Metadata cache is not currently flushed, wait on wait queue. */ + for (;;) { + struct lttng_waiter waiter; + + lttng_waiter_init(&waiter); + lttng_wait_queue_add(&channel->metadata_pushed_wait_queue, &waiter); + if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { + /* Wake up all waiters, ourself included. */ + lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); + /* Ensure proper teardown of waiter. */ + lttng_waiter_wait(&waiter); + break; + } + + lttng_waiter_wait(&waiter); + } }