Fix: sessiond vs consumerd push/get metadata deadlock
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 19 Aug 2015 21:44:59 +0000 (14:44 -0700)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 24 Sep 2015 02:02:57 +0000 (22:02 -0400)
We need to unlock the registry while we push metadata to break a
circular dependency between the consumerd metadata lock and the sessiond
registry lock. Indeed, pushing metadata to the consumerd awaits that it
gets pushed all the way to relayd, but doing so requires grabbing the
metadata lock. If a concurrent metadata request is being performed by
consumerd, this can try to grab the registry lock on the sessiond while
holding the metadata lock on the consumer daemon. Those push and pull
schemes are performed on two different bidirectionnal communication
sockets.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-sessiond/ust-app.c
src/common/consumer-metadata-cache.c
src/common/consumer-metadata-cache.h
src/common/consumer-timer.c
src/common/consumer-timer.h
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 9d262184c237041aafd0e0d08c04daf7b15c0526..a3d99b93e2c452147b57be4a5490b44950f7fc2c 100644 (file)
@@ -439,17 +439,20 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
 {
        int ret;
        char *metadata_str = NULL;
-       size_t len, offset;
+       size_t len, offset, new_metadata_len_sent;
        ssize_t ret_val;
+       uint64_t metadata_key;
 
        assert(registry);
        assert(socket);
 
+       metadata_key = registry->metadata_key;
+
        /*
         * Means that no metadata was assigned to the session. This can
         * happens if no start has been done previously.
         */
-       if (!registry->metadata_key) {
+       if (!metadata_key) {
                return 0;
        }
 
@@ -467,6 +470,7 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
 
        offset = registry->metadata_len_sent;
        len = registry->metadata_len - registry->metadata_len_sent;
+       new_metadata_len_sent = registry->metadata_len;
        if (len == 0) {
                DBG3("No metadata to push for metadata key %" PRIu64,
                                registry->metadata_key);
@@ -485,13 +489,26 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
                ret_val = -ENOMEM;
                goto error;
        }
-       /* Copy what we haven't send out. */
+       /* Copy what we haven't sent out. */
        memcpy(metadata_str, registry->metadata + offset, len);
-       registry->metadata_len_sent += len;
 
 push_data:
-       ret = consumer_push_metadata(socket, registry->metadata_key,
+       pthread_mutex_unlock(&registry->lock);
+       /*
+        * We need to unlock the registry while we push metadata to
+        * break a circular dependency between the consumerd metadata
+        * lock and the sessiond registry lock. Indeed, pushing metadata
+        * to the consumerd awaits that it gets pushed all the way to
+        * relayd, but doing so requires grabbing the metadata lock. If
+        * a concurrent metadata request is being performed by
+        * consumerd, this can try to grab the registry lock on the
+        * sessiond while holding the metadata lock on the consumer
+        * daemon. Those push and pull schemes are performed on two
+        * different bidirectionnal communication sockets.
+        */
+       ret = consumer_push_metadata(socket, metadata_key,
                        metadata_str, len, offset);
+       pthread_mutex_lock(&registry->lock);
        if (ret < 0) {
                /*
                 * There is an acceptable race here between the registry
@@ -509,17 +526,29 @@ push_data:
                 */
                if (ret == -LTTCOMM_CONSUMERD_CHANNEL_FAIL) {
                        ret = 0;
+               } else {
+                       ERR("Error pushing metadata to consumer");
                }
-
-               /*
-                * Update back the actual metadata len sent since it
-                * failed here.
-                */
-               registry->metadata_len_sent -= len;
                ret_val = ret;
                goto error_push;
+       } else {
+               /*
+                * Metadata may have been concurrently pushed, since
+                * we're not holding the registry lock while pushing to
+                * consumer.  This is handled by the fact that we send
+                * the metadata content, size, and the offset at which
+                * that metadata belongs. This may arrive out of order
+                * on the consumer side, and the consumer is able to
+                * deal with overlapping fragments. The consumer
+                * supports overlapping fragments, which must be
+                * contiguous starting from offset 0. We keep the
+                * largest metadata_len_sent value of the concurrent
+                * send.
+                */
+               registry->metadata_len_sent =
+                       max_t(size_t, registry->metadata_len_sent,
+                               new_metadata_len_sent);
        }
-
        free(metadata_str);
        return len;
 
index 0f086fe9643218ba91837446b031154c91453e58..edcd2b11f197d0772e9183b0e46837323e65df97 100644 (file)
@@ -72,8 +72,8 @@ end:
 
 /*
  * Write metadata to the cache, extend the cache if necessary. We support
- * non-contiguous updates but not overlapping ones. If there is contiguous
- * metadata in the cache, we send it to the ring buffer. The metadata cache
+ * overlapping updates, but they need to be contiguous. Send the
+ * contiguous metadata in cache to the ring buffer. The metadata cache
  * lock MUST be acquired to write in the cache.
  *
  * Return 0 on success, a negative value on error.
@@ -101,15 +101,10 @@ int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
        }
 
        memcpy(cache->data + offset, data, len);
-       cache->total_bytes_written += len;
        if (offset + len > cache->max_offset) {
-               cache->max_offset = offset + len;
-       }
-
-       if (cache->max_offset == cache->total_bytes_written) {
                char dummy = 'c';
 
-               cache->contiguous = cache->max_offset;
+               cache->max_offset = offset + len;
                if (channel->monitor) {
                        size_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1],
                                        &dummy, 1);
index aaf9f24d2a5a978b4c0024d290be86910a1ae0df..e7aba4ac923778fea3594ce9161390bd1286450a 100644 (file)
 struct consumer_metadata_cache {
        char *data;
        uint64_t cache_alloc_size;
-       /*
-        * How many bytes from the cache are written contiguously.
-        */
-       uint64_t contiguous;
-       /*
-        * How many bytes are written in the buffer (excluding the wholes).
-        */
-       uint64_t total_bytes_written;
        /*
         * The upper-limit of data written inside the buffer.
         *
         * With the total_bytes_written it allows us to keep track of when the
         * cache contains contiguous metadata ready to be sent to the RB.
-        * The metadata cache updates must not overlap.
+        * All cached data is contiguous.
         */
        uint64_t max_offset;
        /*
index 1408052431aac3613db4e912c505320dd497e1ba..2687eaffd7d556bd00686737a8190d883621979e 100644 (file)
@@ -132,78 +132,103 @@ error:
        return ret;
 }
 
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
 {
        uint64_t ts, stream_id;
        int ret;
 
-       /*
-        * While holding the stream mutex, try to take a snapshot, if it
-        * succeeds, it means that data is ready to be sent, just let the data
-        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
-        * means that there is no data to read after the flush, so we can
-        * safely send the empty index.
-        */
-       pthread_mutex_lock(&stream->lock);
        ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
        if (ret < 0) {
                ERR("Failed to get the current timestamp");
-               goto error_unlock;
+               goto end;
        }
        ret = kernctl_buffer_flush(stream->wait_fd);
        if (ret < 0) {
                ERR("Failed to flush kernel stream");
-               goto error_unlock;
+               goto end;
        }
        ret = kernctl_snapshot(stream->wait_fd);
        if (ret < 0) {
                if (errno != EAGAIN && errno != ENODATA) {
                        PERROR("live timer kernel snapshot");
                        ret = -1;
-                       goto error_unlock;
+                       goto end;
                }
                ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
                if (ret < 0) {
                        PERROR("kernctl_get_stream_id");
-                       goto error_unlock;
+                       goto end;
                }
                DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
                ret = send_empty_index(stream, ts, stream_id);
                if (ret < 0) {
-                       goto error_unlock;
+                       goto end;
                }
        }
        ret = 0;
-
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
+end:
        return ret;
 }
 
-static int check_ust_stream(struct lttng_consumer_stream *stream)
+static int check_kernel_stream(struct lttng_consumer_stream *stream)
 {
-       uint64_t ts, stream_id;
        int ret;
 
-       assert(stream);
-       assert(stream->ustream);
        /*
         * While holding the stream mutex, try to take a snapshot, if it
         * succeeds, it means that data is ready to be sent, just let the data
         * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
         * means that there is no data to read after the flush, so we can
         * safely send the empty index.
+        *
+        * Doing a trylock and checking if waiting on metadata if
+        * trylock fails. Bail out of the stream is indeed waiting for
+        * metadata to be pushed. Busy wait on trylock otherwise.
         */
-       pthread_mutex_lock(&stream->lock);
+       for (;;) {
+               ret = pthread_mutex_trylock(&stream->lock);
+               switch (ret) {
+               case 0:
+                       break;  /* We have the lock. */
+               case EBUSY:
+                       pthread_mutex_lock(&stream->metadata_timer_lock);
+                       if (stream->waiting_on_metadata) {
+                               ret = 0;
+                               stream->missed_metadata_flush = true;
+                               pthread_mutex_unlock(&stream->metadata_timer_lock);
+                               goto end;       /* Bail out. */
+                       }
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       /* Try again. */
+                       caa_cpu_relax();
+                       continue;
+               default:
+                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
+                       ret = -1;
+                       goto end;
+               }
+               break;
+       }
+       ret = consumer_flush_kernel_index(stream);
+       pthread_mutex_unlock(&stream->lock);
+end:
+       return ret;
+}
+
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
+{
+       uint64_t ts, stream_id;
+       int ret;
+
        ret = cds_lfht_is_node_deleted(&stream->node.node);
        if (ret) {
-               goto error_unlock;
+               goto end;
        }
 
        ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
        if (ret < 0) {
                ERR("Failed to get the current timestamp");
-               goto error_unlock;
+               goto end;
        }
        lttng_ustconsumer_flush_buffer(stream, 1);
        ret = lttng_ustconsumer_take_snapshot(stream);
@@ -211,23 +236,68 @@ static int check_ust_stream(struct lttng_consumer_stream *stream)
                if (ret != -EAGAIN) {
                        ERR("Taking UST snapshot");
                        ret = -1;
-                       goto error_unlock;
+                       goto end;
                }
                ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
                if (ret < 0) {
                        PERROR("ustctl_get_stream_id");
-                       goto error_unlock;
+                       goto end;
                }
                DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
                ret = send_empty_index(stream, ts, stream_id);
                if (ret < 0) {
-                       goto error_unlock;
+                       goto end;
                }
        }
        ret = 0;
+end:
+       return ret;
+}
 
-error_unlock:
+static int check_ust_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       assert(stream);
+       assert(stream->ustream);
+       /*
+        * While holding the stream mutex, try to take a snapshot, if it
+        * succeeds, it means that data is ready to be sent, just let the data
+        * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
+        * means that there is no data to read after the flush, so we can
+        * safely send the empty index.
+        *
+        * Doing a trylock and checking if waiting on metadata if
+        * trylock fails. Bail out of the stream is indeed waiting for
+        * metadata to be pushed. Busy wait on trylock otherwise.
+        */
+       for (;;) {
+               ret = pthread_mutex_trylock(&stream->lock);
+               switch (ret) {
+               case 0:
+                       break;  /* We have the lock. */
+               case EBUSY:
+                       pthread_mutex_lock(&stream->metadata_timer_lock);
+                       if (stream->waiting_on_metadata) {
+                               ret = 0;
+                               stream->missed_metadata_flush = true;
+                               pthread_mutex_unlock(&stream->metadata_timer_lock);
+                               goto end;       /* Bail out. */
+                       }
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       /* Try again. */
+                       caa_cpu_relax();
+                       continue;
+               default:
+                       ERR("Unexpected pthread_mutex_trylock error %d", ret);
+                       ret = -1;
+                       goto end;
+               }
+               break;
+       }
+       ret = consumer_flush_ust_index(stream);
        pthread_mutex_unlock(&stream->lock);
+end:
        return ret;
 }
 
index f3fac5d25d8ce791b0767f55bea5fb5d2f6f9d13..f877b8ffe364e21bf53dc7b052bd20861b4b7c4d 100644 (file)
@@ -52,4 +52,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel);
 void *consumer_timer_thread(void *data);
 void consumer_signal_init(void);
 
+int consumer_flush_kernel_index(struct lttng_consumer_stream *stream);
+int consumer_flush_ust_index(struct lttng_consumer_stream *stream);
+
 #endif /* CONSUMER_TIMER_H */
index 3bb2e25d08333348f42c0c97060dc59496d32a5e..0b59c511cf02446939dd87266c4e4044604f3824 100644 (file)
@@ -562,6 +562,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        stream->index_fd = -1;
        pthread_mutex_init(&stream->lock, NULL);
+       pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
        if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
index 790cb6b135d9b9983a9826e4713112ad586da247..e50835e019f27176143ba857a572b52415c919f4 100644 (file)
@@ -237,6 +237,21 @@ struct lttng_consumer_stream {
        int shm_fd_is_copy;
        int data_read;
        int hangup_flush_done;
+
+       /*
+        * metadata_timer_lock protects flags waiting_on_metadata and
+        * missed_metadata_flush.
+        */
+       pthread_mutex_t metadata_timer_lock;
+       /*
+        * Flag set when awaiting metadata to be pushed. Used in the
+        * timer thread to skip waiting on the stream (and stream lock) to
+        * ensure we can proceed to flushing metadata in live mode.
+        */
+       bool waiting_on_metadata;
+       /* Raised when a timer misses a metadata flush. */
+       bool missed_metadata_flush;
+
        enum lttng_event_output output;
        /* Maximum subbuffer size. */
        unsigned long max_sb_size;
index 5f6b5f49e5fa6d582cb0f0bb44a6b644fbaa59e7..3505fa0ff838d9e821f7ab1bda2b0b9129af3c05 100644 (file)
@@ -1217,7 +1217,22 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                /*
                 * In live, block until all the metadata is sent.
                 */
+               pthread_mutex_lock(&stream->metadata_timer_lock);
+               assert(!stream->missed_metadata_flush);
+               stream->waiting_on_metadata = true;
+               pthread_mutex_unlock(&stream->metadata_timer_lock);
+
                err = consumer_stream_sync_metadata(ctx, stream->session_id);
+
+               pthread_mutex_lock(&stream->metadata_timer_lock);
+               stream->waiting_on_metadata = false;
+               if (stream->missed_metadata_flush) {
+                       stream->missed_metadata_flush = false;
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       (void) consumer_flush_kernel_index(stream);
+               } else {
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+               }
                if (err < 0) {
                        goto end;
                }
index 7eba01d10da7ebb7f3faaa397847af922229bef8..28b7f10a6f7b655a6bf62f7b8594bc21be06515f 100644 (file)
@@ -1028,7 +1028,12 @@ error:
 }
 
 /*
- * Receive the metadata updates from the sessiond.
+ * Receive the metadata updates from the sessiond. Supports receiving
+ * overlapping metadata, but is needs to always belong to a contiguous
+ * range starting from 0.
+ * Be careful about the locks held when calling this function: it needs
+ * the metadata cache flush to concurrently progress in order to
+ * complete.
  */
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, struct lttng_consumer_channel *channel,
@@ -1440,6 +1445,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                health_code_update();
 
+               if (!len) {
+                       /*
+                        * There is nothing to receive. We have simply
+                        * checked whether the channel can be found.
+                        */
+                       ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+                       goto end_msg_sessiond;
+               }
+
                /* Tell session daemon we are ready to receive the metadata. */
                ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
                if (ret < 0) {
@@ -1773,7 +1787,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        int ret;
 
        pthread_mutex_lock(&stream->chan->metadata_cache->lock);
-       if (stream->chan->metadata_cache->contiguous
+       if (stream->chan->metadata_cache->max_offset
                        == stream->ust_metadata_pushed) {
                ret = 0;
                goto end;
@@ -1781,7 +1795,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
 
        write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
                        &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
-                       stream->chan->metadata_cache->contiguous
+                       stream->chan->metadata_cache->max_offset
                        - stream->ust_metadata_pushed);
        assert(write_len != 0);
        if (write_len < 0) {
@@ -1791,7 +1805,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        }
        stream->ust_metadata_pushed += write_len;
 
-       assert(stream->chan->metadata_cache->contiguous >=
+       assert(stream->chan->metadata_cache->max_offset >=
                        stream->ust_metadata_pushed);
        ret = write_len;
 
@@ -1805,7 +1819,9 @@ end:
  * Sync metadata meaning request them to the session daemon and snapshot to the
  * metadata thread can consumer them.
  *
- * Metadata stream lock MUST be acquired.
+ * Metadata stream lock is held here, but we need to release it when
+ * interacting with sessiond, else we cause a deadlock with live
+ * awaiting on metadata to be pushed out.
  *
  * Return 0 if new metadatda is available, EAGAIN if the metadata stream
  * is empty or a negative value on error.
@@ -1819,6 +1835,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
        assert(ctx);
        assert(metadata);
 
+       pthread_mutex_unlock(&metadata->lock);
        /*
         * Request metadata from the sessiond, but don't wait for the flush
         * because we locked the metadata thread.
@@ -1827,6 +1844,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
        if (ret < 0) {
                goto end;
        }
+       pthread_mutex_lock(&metadata->lock);
 
        ret = commit_one_metadata_packet(metadata);
        if (ret <= 0) {
@@ -2053,7 +2071,23 @@ retry:
                /*
                 * In live, block until all the metadata is sent.
                 */
+               pthread_mutex_lock(&stream->metadata_timer_lock);
+               assert(!stream->missed_metadata_flush);
+               stream->waiting_on_metadata = true;
+               pthread_mutex_unlock(&stream->metadata_timer_lock);
+
                err = consumer_stream_sync_metadata(ctx, stream->session_id);
+
+               pthread_mutex_lock(&stream->metadata_timer_lock);
+               stream->waiting_on_metadata = false;
+               if (stream->missed_metadata_flush) {
+                       stream->missed_metadata_flush = false;
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+                       (void) consumer_flush_ust_index(stream);
+               } else {
+                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+               }
+
                if (err < 0) {
                        goto end;
                }
@@ -2134,7 +2168,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
                uint64_t contiguous, pushed;
 
                /* Ease our life a bit. */
-               contiguous = stream->chan->metadata_cache->contiguous;
+               contiguous = stream->chan->metadata_cache->max_offset;
                pushed = stream->ust_metadata_pushed;
 
                /*
@@ -2263,6 +2297,10 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
  * function or any of its callees. Timers have a very strict locking
  * semantic with respect to teardown. Failure to respect this semantic
  * introduces deadlocks.
+ *
+ * DON'T hold the metadata lock when calling this function, else this
+ * can cause deadlock involving consumer awaiting for metadata to be
+ * pushed out due to concurrent interaction with the session daemon.
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *channel, int timer, int wait)
This page took 0.037477 seconds and 4 git commands to generate.