Fix: ust-consumer: metadata thread not woken-up after version change
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 82dd6bbee37be3c8ef0fac03052a12b3715b5290..81c05d2da1febfe5fff215e9917195dd4b4d8ab4 100644 (file)
@@ -1288,6 +1288,17 @@ error_unlock:
        return ret;
 }
 
+static
+void metadata_stream_reset_cache_consumed_position(
+               struct lttng_consumer_stream *stream)
+{
+       ASSERT_LOCKED(stream->lock);
+
+       DBG("Reset metadata cache of session %" PRIu64,
+                       stream->chan->session_id);
+       stream->ust_metadata_pushed = 0;
+}
+
 /*
  * Receive the metadata updates from the sessiond. Supports receiving
  * overlapping metadata, but is needs to always belong to a contiguous
@@ -1302,6 +1313,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        char *metadata_str;
+       enum consumer_metadata_cache_write_status cache_write_status;
 
        DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
 
@@ -1325,9 +1337,40 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
        health_code_update();
 
        pthread_mutex_lock(&channel->metadata_cache->lock);
-       ret = consumer_metadata_cache_write(channel, offset, len, version,
-                       metadata_str);
-       if (ret < 0) {
+       cache_write_status = consumer_metadata_cache_write(
+                       channel, offset, len, version, metadata_str);
+       pthread_mutex_unlock(&channel->metadata_cache->lock);
+       switch (cache_write_status) {
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
+               /*
+                * The write entirely overlapped with existing contents of the
+                * same metadata version (same content); there is nothing to do.
+                */
+               break;
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED:
+               /*
+                * The metadata cache was invalidated (previously pushed
+                * content has been overwritten). Reset the stream's consumed
+                * metadata position to ensure the metadata poll thread consumes
+                * the whole cache.
+                */
+               pthread_mutex_lock(&channel->metadata_stream->lock);
+               metadata_stream_reset_cache_consumed_position(
+                               channel->metadata_stream);
+               pthread_mutex_unlock(&channel->metadata_stream->lock);
+               /* Fall-through. */
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
+               /*
+                * In both cases, the metadata poll thread has new data to
+                * consume.
+                */
+               ret = consumer_metadata_wakeup_pipe(channel);
+               if (ret) {
+                       ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+                       goto end_free;
+               }
+               break;
+       case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR:
                /* Unable to handle metadata. Notify session daemon. */
                ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
                /*
@@ -1335,10 +1378,10 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                 * not have been updated which could create an infinite loop below when
                 * waiting for the metadata cache to be flushed.
                 */
-               pthread_mutex_unlock(&channel->metadata_cache->lock);
                goto end_free;
+       default:
+               abort();
        }
-       pthread_mutex_unlock(&channel->metadata_cache->lock);
 
        if (!wait) {
                goto end_free;
@@ -2417,20 +2460,11 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
        return ustctl_stream_close_wakeup_fd(stream->ustream);
 }
 
-static
-void metadata_stream_reset_cache_consumed_position(
-               struct lttng_consumer_stream *stream)
-{
-       DBG("Reset metadata cache of session %" PRIu64,
-                       stream->chan->session_id);
-       stream->ust_metadata_pushed = 0;
-}
-
 /*
  * Write up to one packet from the metadata cache to the channel.
  *
- * Returns the number of bytes pushed in the cache, or a negative value
- * on error.
+ * Returns the number of bytes pushed from the cache into the ring buffer, or a
+ * negative value on error.
  */
 static
 int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
@@ -2514,15 +2548,13 @@ end:
  * awaiting on metadata to be pushed out.
  *
  * The RCU read side lock must be held by the caller.
- *
- * Return 0 if new metadatda is available, EAGAIN if the metadata stream
- * is empty or a negative value on error.
  */
-int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+enum sync_metadata_status lttng_ustconsumer_sync_metadata(
+               struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *metadata_stream)
 {
        int ret;
-       int retry = 0;
+       enum sync_metadata_status status;
        struct lttng_consumer_channel *metadata_channel;
 
        assert(ctx);
@@ -2537,6 +2569,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
        ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
        pthread_mutex_lock(&metadata_stream->lock);
        if (ret < 0) {
+               status = SYNC_METADATA_STATUS_ERROR;
                goto end;
        }
 
@@ -2554,38 +2587,30 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
        if (consumer_stream_is_deleted(metadata_stream)) {
                DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization",
                                metadata_stream->key);
-               ret = 0;
+               status = SYNC_METADATA_STATUS_NO_DATA;
                goto end;
        }
 
        ret = commit_one_metadata_packet(metadata_stream);
-       if (ret <= 0) {
+       if (ret < 0) {
+               status = SYNC_METADATA_STATUS_ERROR;
                goto end;
        } else if (ret > 0) {
-               retry = 1;
+               status = SYNC_METADATA_STATUS_NEW_DATA;
+       } else /* ret == 0 */ {
+               status = SYNC_METADATA_STATUS_NO_DATA;
+               goto end;
        }
 
        ret = ustctl_snapshot(metadata_stream->ustream);
        if (ret < 0) {
-               if (errno != EAGAIN) {
-                       ERR("Sync metadata, taking UST snapshot");
-                       goto end;
-               }
-               DBG("No new metadata when syncing them.");
-               /* No new metadata, exit. */
-               ret = ENODATA;
+               ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", ret);
+               status = SYNC_METADATA_STATUS_ERROR;
                goto end;
        }
 
-       /*
-        * After this flush, we still need to extract metadata.
-        */
-       if (retry) {
-               ret = EAGAIN;
-       }
-
 end:
-       return ret;
+       return status;
 }
 
 /*
@@ -3012,6 +3037,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        assert(stream);
        assert(stream->ustream);
+       ASSERT_LOCKED(stream->lock);
 
        DBG("UST consumer checking data pending");
 
@@ -3024,7 +3050,9 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
                uint64_t contiguous, pushed;
 
                /* Ease our life a bit. */
+               pthread_mutex_lock(&stream->chan->metadata_cache->lock);
                contiguous = stream->chan->metadata_cache->max_offset;
+               pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
                pushed = stream->ust_metadata_pushed;
 
                /*
This page took 0.027437 seconds and 4 git commands to generate.