Fix: consumerd: slow metadata push slows down application registration
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 29 Jun 2023 18:04:37 +0000 (14:04 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 27 Jul 2023 15:48:49 +0000 (11:48 -0400)
Issue observed
--------------

When rotating the channels of a session configured with a "per-pid"
buffer sharing policy, applications with a long registration
timeout (e.g. LTTNG_UST_REGISTER_TIMEOUT=-1, see LTTNG-UST(3)) sometimes
experience long start-up times.

Cause
-----

The session list lock is held during the registration of an application
and during the setup of a rotation.

While setting up a rotation in the userspace domain, the session daemon
flushes its metadata cache to the userspace consumer daemon and waits
for a confirmation that all metadata emitted before that point in time
has been serialized (whether on disk or sent through a network output).

As the consumer daemon waits for the metadata to be consumed, it
periodically checks the metadata stream's output position with a 200ms
delay (see DEFAULT_METADATA_AVAILABILITY_WAIT_TIME).

In practice, in per-uid mode, this delay is seldomly encountered since
the metadata has already been pushed by the consumption thread.
Moreover, if it was not, a single polling iteration will typically
suffice.

However, in per-pid buffering mode and with a sustained "heavy" data
production rate, this delay becomes problematic since:
  - metadata is pushed for every application,
  - the delay is hit almost systematically as the consumption thread
    is busy and has to catch up to consume the most recent metadata.

Hence, some rotation setups can easily take multiple seconds (at least
200ms per application). This makes the locking scheme employed on that
path unsuitable as it blocks some operations (like application
registrations) for an extended period of time.

Solution
--------

The polling "back-off" delay is eliminated by using a waiter that allows
the consumer daemon thread that runs the metadata push command to
wake-up whenever the criteria used to evaluate the "pushed" metadata
position are changed.

Those criteria are:
  - the metadata stream's pushed position
  - the lifetime of the metadata channel's stream
  - the status of the session's endpoint

Whenever those states are affected, the waiters are woken-up to force a
re-evaluation of the metadata cache flush position and, eventually,
cause the metadata push command to complete.

Note
----

The waiter queue is adapted from urcu-wait.h of liburcu (also LGPL
licensed).

Change-Id: Ib86c2e878abe205c73f930e6de958c0b10486a37
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
.clang-format
src/bin/lttng-sessiond/notification-thread-events.cpp
src/common/consumer/consumer-metadata-cache.cpp
src/common/consumer/consumer-metadata-cache.hpp
src/common/consumer/consumer-timer.cpp
src/common/consumer/consumer.cpp
src/common/consumer/consumer.hpp
src/common/ust-consumer/ust-consumer.cpp
src/common/ust-consumer/ust-consumer.hpp
src/common/waiter.cpp
src/common/waiter.hpp

index f854e32c923ac59c8f83bf0a33d6b0bba42f83ab..28ea69873145dcb40b167d659a3b828b8968e9b8 100644 (file)
@@ -54,6 +54,7 @@ ForEachMacros:
   - 'cds_list_for_each_entry_safe'
   - 'for_each_action_mutable'
   - 'for_each_action_const'
+  - 'cds_wfs_for_each_blocking_safe'
 
 IncludeBlocks: Regroup
 IncludeCategories:
index aaf24b3f0f404391cdce74a31fa0a233e63aa1c0..f18bc68d44760c969b01c347500c90214c201982 100644 (file)
@@ -3316,12 +3316,12 @@ end:
                free(cmd);
                cmd = nullptr;
        } else {
-               lttng_waiter_wake_up(&cmd->reply_waiter);
+               lttng_waiter_wake(&cmd->reply_waiter);
        }
        return ret;
 error_unlock:
        /* Wake-up and return a fatal error to the calling thread. */
-       lttng_waiter_wake_up(&cmd->reply_waiter);
+       lttng_waiter_wake(&cmd->reply_waiter);
        cmd->reply_code = LTTNG_ERR_FATAL;
 error:
        /* Indicate a fatal error to the caller. */
index 9af9ab0fc67ecb1d16f5f5a6a77c6780550f8d06..462e079d8c705dfb5785244fd67f3cd4b63d5f59 100644 (file)
@@ -184,18 +184,16 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel)
 /*
  * Check if the cache is flushed up to the offset passed in parameter.
  *
- * Return 0 if everything has been flushed, 1 if there is data not flushed.
+ * Return true if everything has been flushed, false if there is data not flushed.
  */
-int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
-                                   uint64_t offset,
-                                   int timer)
+namespace {
+bool consumer_metadata_cache_is_flushed(struct lttng_consumer_channel *channel,
+                                       uint64_t offset,
+                                       int timer)
 {
-       int ret = 0;
+       bool done_flushing = false;
        struct lttng_consumer_stream *metadata_stream;
 
-       LTTNG_ASSERT(channel);
-       LTTNG_ASSERT(channel->metadata_cache);
-
        /*
         * If not called from a timer handler, we have to take the
         * channel lock to be mutually exclusive with channel teardown.
@@ -213,7 +211,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
                 * Having no metadata stream means the channel is being destroyed so there
                 * is no cache to flush anymore.
                 */
-               ret = 0;
+               done_flushing = true;
                goto end_unlock_channel;
        }
 
@@ -221,22 +219,57 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
        pthread_mutex_lock(&channel->metadata_cache->lock);
 
        if (metadata_stream->ust_metadata_pushed >= offset) {
-               ret = 0;
+               done_flushing = true;
        } else if (channel->metadata_stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
                /* An inactive endpoint means we don't have to flush anymore. */
-               ret = 0;
+               done_flushing = true;
        } else {
                /* Still not completely flushed. */
-               ret = 1;
+               done_flushing = false;
        }
 
        pthread_mutex_unlock(&channel->metadata_cache->lock);
        pthread_mutex_unlock(&metadata_stream->lock);
+
 end_unlock_channel:
        pthread_mutex_unlock(&channel->timer_lock);
        if (!timer) {
                pthread_mutex_unlock(&channel->lock);
        }
 
-       return ret;
+       return done_flushing;
+}
+} /* namespace */
+
+/*
+ * Wait until the cache is flushed up to the offset passed in parameter or the
+ * metadata stream has been destroyed.
+ */
+void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel,
+                                         uint64_t offset,
+                                         bool invoked_by_timer)
+{
+       assert(channel);
+       assert(channel->metadata_cache);
+
+       if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) {
+               return;
+       }
+
+       /* Metadata cache is not currently flushed, wait on wait queue. */
+       for (;;) {
+               struct lttng_waiter waiter;
+
+               lttng_waiter_init(&waiter);
+               lttng_wait_queue_add(&channel->metadata_pushed_wait_queue, &waiter);
+               if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) {
+                       /* Wake up all waiters, ourself included. */
+                       lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
+                       /* Ensure proper teardown of waiter. */
+                       lttng_waiter_wait(&waiter);
+                       break;
+               }
+
+               lttng_waiter_wait(&waiter);
+       }
 }
index 9f958ac58704b2b81724fbc4efd0a9e6fc5a9ef2..529b9f0756f560023393f9bacb16b89470df785d 100644 (file)
@@ -58,8 +58,8 @@ consumer_metadata_cache_write(struct consumer_metadata_cache *cache,
                              const char *data);
 int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel);
 void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel);
-int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
-                                   uint64_t offset,
-                                   int timer);
+void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel,
+                                         uint64_t offset,
+                                         bool invoked_by_timer);
 
 #endif /* CONSUMER_METADATA_CACHE_H */
index 133ec6c0eaaca52d6348241b5ca61e08556eb41c..8c9371bae780191207c345e3a39184f4b24c97ab 100644 (file)
@@ -96,7 +96,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo
                 * - metadata_socket_lock
                 *   - Calling lttng_ustconsumer_recv_metadata():
                 *     - channel->metadata_cache->lock
-                *     - Calling consumer_metadata_cache_flushed():
+                *     - Calling consumer_wait_metadata_cache_flushed():
                 *       - channel->timer_lock
                 *         - channel->metadata_cache->lock
                 *
@@ -105,7 +105,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo
                 * they are held while consumer_timer_switch_stop() is
                 * called.
                 */
-               ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
+               ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1);
                if (ret < 0) {
                        channel->switch_timer_error = 1;
                }
index 01845871b91a2a2423e4375e983de1cac6243b7f..4823d9b9c59aa6f9933d9e7552fe27fb48865cb6 100644 (file)
@@ -462,6 +462,8 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
        cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
+                       lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
+
                        DBG("Delete flag set to metadata stream %d", stream->wait_fd);
                }
        }
@@ -1033,8 +1035,9 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->monitor = monitor;
        channel->live_timer_interval = live_timer_interval;
        channel->is_live = is_in_live_session;
-       pthread_mutex_init(&channel->lock, nullptr);
-       pthread_mutex_init(&channel->timer_lock, nullptr);
+       pthread_mutex_init(&channel->lock, NULL);
+       pthread_mutex_init(&channel->timer_lock, NULL);
+       lttng_wait_queue_init(&channel->metadata_pushed_wait_queue);
 
        switch (output) {
        case LTTNG_EVENT_SPLICE:
@@ -2130,6 +2133,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l
         * pointer value.
         */
        channel->metadata_stream = nullptr;
+       lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
 
        if (channel->metadata_cache) {
                pthread_mutex_unlock(&channel->metadata_cache->lock);
index 053744904c7c7907dadae920520f6f46cc9b0d0d..9de8d05a3cb188b98c12e8c0c5ac1a476500591a 100644 (file)
@@ -20,6 +20,7 @@
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/trace-chunk-registry.hpp>
 #include <common/uuid.hpp>
+#include <common/waiter.hpp>
 
 #include <lttng/lttng.h>
 
@@ -184,6 +185,11 @@ struct lttng_consumer_channel {
        /* Metadata cache is metadata channel */
        struct consumer_metadata_cache *metadata_cache;
 
+       /*
+        * Wait queue awaiting updates to metadata stream's flushed position.
+        */
+       struct lttng_wait_queue metadata_pushed_wait_queue;
+
        /* For UST metadata periodical flush */
        int switch_timer_enabled;
        timer_t switch_timer;
index 6274cdcc5341bdfa62bdcca12e8f8c945bb50007..00671a876a8bb637d631b58e31d1bb55cd793322 100644 (file)
@@ -930,6 +930,8 @@ error:
         */
        consumer_stream_destroy(metadata->metadata_stream, nullptr);
        metadata->metadata_stream = nullptr;
+       lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue);
+
 send_streams_error:
 error_no_stream:
 end:
@@ -967,7 +969,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
         * Ask the sessiond if we have new metadata waiting and update the
         * consumer metadata cache.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1);
        if (ret < 0) {
                goto error;
        }
@@ -1013,6 +1015,7 @@ error_stream:
         */
        consumer_stream_destroy(metadata_stream, nullptr);
        metadata_channel->metadata_stream = nullptr;
+       lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue);
 
 error:
        return ret;
@@ -1251,7 +1254,7 @@ int lttng_ustconsumer_recv_metadata(int sock,
                                    uint64_t len,
                                    uint64_t version,
                                    struct lttng_consumer_channel *channel,
-                                   int timer,
+                                   bool invoked_by_timer,
                                    int wait)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
@@ -1339,13 +1342,8 @@ int lttng_ustconsumer_recv_metadata(int sock,
        if (!wait) {
                goto end_free;
        }
-       while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
-               DBG("Waiting for metadata to be flushed");
-
-               health_code_update();
 
-               usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
-       }
+       consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
 
 end_free:
        free(metadata_str);
@@ -1790,7 +1788,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                ret = lttng_ustconsumer_recv_metadata(
-                       sock, key, offset, len, version, found_channel, 0, 1);
+                       sock, key, offset, len, version, found_channel, false, 1);
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_push_metadata_fatal;
@@ -2551,6 +2549,7 @@ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
                goto end;
        }
        stream->ust_metadata_pushed += write_len;
+       lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
 
        LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
        ret = write_len;
@@ -2599,7 +2598,7 @@ lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
         * Request metadata from the sessiond, but don't wait for the flush
         * because we locked the metadata thread.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0);
        pthread_mutex_lock(&metadata_stream->lock);
        if (ret < 0) {
                status = SYNC_METADATA_STATUS_ERROR;
@@ -3238,7 +3237,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
                                       struct lttng_consumer_channel *channel,
-                                      int timer,
+                                      bool invoked_by_timer,
                                       int wait)
 {
        struct lttcomm_metadata_request_msg request;
@@ -3343,8 +3342,14 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
 
        health_code_update();
 
-       ret = lttng_ustconsumer_recv_metadata(
-               ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait);
+       ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+                                             key,
+                                             offset,
+                                             len,
+                                             version,
+                                             channel,
+                                             invoked_by_timer,
+                                             wait);
        if (ret >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
index 7d0e00614ab89117d4ff770afdef383385c00e0f..13de36f1ff5340c31d416c5f7ab735662e20c042 100644 (file)
@@ -51,11 +51,11 @@ int lttng_ustconsumer_recv_metadata(int sock,
                                    uint64_t len,
                                    uint64_t version,
                                    struct lttng_consumer_channel *channel,
-                                   int timer,
+                                   bool invoked_by_timer,
                                    int wait);
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
                                       struct lttng_consumer_channel *channel,
-                                      int timer,
+                                      bool invoked_by_timer,
                                       int wait);
 enum sync_metadata_status lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
                                                          struct lttng_consumer_stream *metadata);
@@ -195,7 +195,7 @@ static inline int lttng_ustconsumer_recv_metadata(int sock __attribute__((unused
                                                  uint64_t version __attribute__((unused)),
                                                  struct lttng_consumer_channel *channel
                                                  __attribute__((unused)),
-                                                 int timer __attribute__((unused)))
+                                                 bool invoked_by_timer __attribute__((unused)))
 {
        return -ENOSYS;
 }
@@ -204,7 +204,7 @@ static inline int lttng_ustconsumer_request_metadata(struct lttng_consumer_local
                                                     __attribute__((unused)),
                                                     struct lttng_consumer_channel *channel
                                                     __attribute__((unused)),
-                                                    int timer __attribute__((unused)),
+                                                    bool invoked_by_timer __attribute__((unused)),
                                                     int wait __attribute__((unused)))
 {
        return -ENOSYS;
index c0ea3392fceea52f1438b6e839c1742e7348503b..d185e299720617e64e7514d114e33cea4a0eb386 100644 (file)
@@ -7,6 +7,7 @@
  */
 
 #include "error.hpp"
+#include "macros.hpp"
 #include "waiter.hpp"
 
 #include <poll.h>
@@ -41,15 +42,18 @@ void lttng_waiter_wait(struct lttng_waiter *waiter)
 {
        unsigned int i;
 
-       DBG("Beginning of waiter wait period");
-       /* Load and test condition before read state */
+       DBG("Beginning of waiter \"wait\" period");
+
+       /* Load and test condition before read state. */
        cmm_smp_rmb();
        for (i = 0; i < WAIT_ATTEMPTS; i++) {
                if (uatomic_read(&waiter->state) != WAITER_WAITING) {
                        goto skip_futex_wait;
                }
+
                caa_cpu_relax();
        }
+
        while (uatomic_read(&waiter->state) == WAITER_WAITING) {
                if (!futex_noasync(
                            &waiter->state, FUTEX_WAIT, WAITER_WAITING, nullptr, nullptr, 0)) {
@@ -63,6 +67,7 @@ void lttng_waiter_wait(struct lttng_waiter *waiter)
                         */
                        continue;
                }
+
                switch (errno) {
                case EAGAIN:
                        /* Value already changed. */
@@ -89,13 +94,16 @@ skip_futex_wait:
                if (uatomic_read(&waiter->state) & WAITER_TEARDOWN) {
                        break;
                }
+
                caa_cpu_relax();
        }
+
        while (!(uatomic_read(&waiter->state) & WAITER_TEARDOWN)) {
                poll(nullptr, 0, 10);
        }
+
        LTTNG_ASSERT(uatomic_read(&waiter->state) & WAITER_TEARDOWN);
-       DBG("End of waiter wait period");
+       DBG("End of waiter \"wait\" period");
 }
 
 /*
@@ -103,7 +111,7 @@ skip_futex_wait:
  * execution. In this scheme, the waiter owns the node memory, and we only allow
  * it to free this memory when it sees the WAITER_TEARDOWN flag.
  */
-void lttng_waiter_wake_up(struct lttng_waiter *waiter)
+void lttng_waiter_wake(struct lttng_waiter *waiter)
 {
        cmm_smp_mb();
        LTTNG_ASSERT(uatomic_read(&waiter->state) == WAITER_WAITING);
@@ -114,6 +122,38 @@ void lttng_waiter_wake_up(struct lttng_waiter *waiter)
                        abort();
                }
        }
+
        /* Allow teardown of struct urcu_wait memory. */
        uatomic_or(&waiter->state, WAITER_TEARDOWN);
 }
+
+void lttng_wait_queue_init(struct lttng_wait_queue *queue)
+{
+       cds_wfs_init(&queue->stack);
+}
+
+void lttng_wait_queue_add(struct lttng_wait_queue *queue, struct lttng_waiter *waiter)
+{
+       (void) cds_wfs_push(&queue->stack, &waiter->wait_queue_node);
+}
+
+void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue)
+{
+       cds_wfs_head *waiters;
+       cds_wfs_node *iter, *iter_n;
+
+       /* Move all waiters from the queue to our local stack. */
+       waiters = __cds_wfs_pop_all(&queue->stack);
+
+       /* Wake all waiters in our stack head. */
+       cds_wfs_for_each_blocking_safe (waiters, iter, iter_n) {
+               auto *waiter = lttng::utils::container_of(iter, &lttng_waiter::wait_queue_node);
+
+               /* Don't wake already running threads. */
+               if (waiter->state & WAITER_RUNNING) {
+                       continue;
+               }
+
+               lttng_waiter_wake(waiter);
+       }
+}
index d88fe0f66af96a1029afe019e925fb6d2cb7d809..8ea0c2d83f450ad7ff6da749f57e6e0424c68e13 100644 (file)
@@ -23,15 +23,33 @@ struct lttng_waiter {
        int32_t state;
 };
 
+struct lttng_wait_queue {
+       struct cds_wfs_stack stack;
+};
+
 void lttng_waiter_init(struct lttng_waiter *waiter);
 
 void lttng_waiter_wait(struct lttng_waiter *waiter);
 
 /*
- * lttng_waiter_wake_up must only be called by a single waker.
+ * lttng_waiter_wake must only be called by a single waker.
  * It is invalid for multiple "wake" operations to be invoked
  * on a single waiter without re-initializing it before.
  */
-void lttng_waiter_wake_up(struct lttng_waiter *waiter);
+void lttng_waiter_wake(struct lttng_waiter *waiter);
+
+void lttng_wait_queue_init(struct lttng_wait_queue *queue);
+
+/*
+ * Atomically add a waiter to a wait queue.
+ * A full memory barrier is issued before being added to the wait queue.
+ */
+void lttng_wait_queue_add(struct lttng_wait_queue *queue, struct lttng_waiter *waiter);
+
+/*
+ * Wake every waiter present in the wait queue and remove them from
+ * the queue.
+ */
+void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue);
 
 #endif /* LTTNG_WAITER_H */
This page took 0.036897 seconds and 4 git commands to generate.