Fix: sessiond: size-based notification occasionally not triggered
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 4 Apr 2023 15:03:03 +0000 (11:03 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 11 Apr 2023 20:26:58 +0000 (16:26 -0400)
Issue observed
==============

When tracing to multiple sessions with scheduled size-based rotations
that occur simultaneously (typically because they trace the same events
and use the same rotation schedule configuration), the start of some
rotations seems to be delayed indefinitely.

Cause
=====

The size-based rotations are implemented by piggy-backing onto the
channel monitoring facilities.

Essentially, a per-channel timer samples a number of statistics on the
consumer daemon end, which transmits them to the session daemon.

The session daemon's notification subsystem evaluates the statistics
against the various registered triggers bound to the channels being
monitored when a statistics sample is received.

To implement size-based rotations, internal triggers are registered with
the "consumed size" condition set to a given threshold. A session
rotation management thread (which also performs other tasks) uses a
notification channel to wait for sessions to reach their target size,
starts rotations as needed, and sets a new threshold according to the
sessions' configured rotation schedule.

The rotation thread uses liblttng-ctl's API to consume notifications
from a notification channel.

At any time, a notification channel may have multiple notifications
queued-up internally in its buffers. This is because a notification
channel multiplexes command replies and notifications over the same UNIX
socket. The current protocol specifies that multiple notifications can
be received before the reply to a command.

In such cases, the notification channel client implementation internally
queues them and provides them on the next calls to
lttng_notification_channel_get_next_notification().

This is correct with respect to the public API, which is intended to be
used in "blocking mode". However, this internal user uses the
notification channel's raw file descriptor to wake-up when a
notification is available.

This is problematic because notifications may be queued by the
notification channel (and thus removed from the socket) while waiting
for command replies (subscribing and unsubscribing from notification
conditions). In such a case, a notification is available but the
rotation thread does not wake-up to consume it as nothing is available
in the socket's buffer.

When this happens, a session that is supposed to rotate automatically
appears to grow indefinitely. It will typically eventually rotate as new
notifications become available and cause the rotation thread to wake-up.
However, a "lag" builds up as the notification that caused the wake-up
is not consumed. Instead, the last buffered notification is provided to
the rotation thread.

Solution
========

Use an event_fd to wake-up the rotation thread whenever a command
completes on the notification channel. This ensures that any
notification that was queued while waiting for a reply to the command is
eventually consumed.

Known drawbacks
===============

None.

Note
====

The use of C++ features is kept to a minimum in this patch in order to
make it easier to backport to the stable releases. A clean-up patch
follows and makes the code conform to the coding standards.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I8974b10124704d1e66e8da32d495fee738e3d43f

src/bin/lttng-sessiond/rotate.cpp
src/bin/lttng-sessiond/rotation-thread.cpp
src/bin/lttng-sessiond/rotation-thread.hpp

index cc857544390ea24674b8da75cf035fc59a0fd430..56a61fe2a35924430ce5dfe6dec548cca458fa87 100644 (file)
@@ -49,6 +49,7 @@ int subscribe_session_consumed_size_rotation(
        int ret;
        enum lttng_condition_status condition_status;
        enum lttng_notification_channel_status nc_status;
+       const uint64_t eventfd_increment_value = 1;
        struct lttng_condition *rotate_condition = nullptr;
        struct lttng_action *notify_action = nullptr;
        const struct lttng_credentials session_creds = {
@@ -108,6 +109,15 @@ int subscribe_session_consumed_size_rotation(
                goto end;
        }
 
+       ret = lttng_write(rotate_notification_channel_subscription_change_eventfd,
+                         &eventfd_increment_value,
+                         sizeof(eventfd_increment_value));
+       if (ret != sizeof(eventfd_increment_value)) {
+               PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed");
+               ret = -1;
+               goto end;
+       }
+
        ret = notification_thread_command_register_trigger(
                notification_thread_handle, session->rotate_trigger, true);
        if (ret < 0 && ret != -LTTNG_ERR_TRIGGER_EXISTS) {
@@ -132,6 +142,7 @@ int unsubscribe_session_consumed_size_rotation(
 {
        int ret = 0;
        enum lttng_notification_channel_status status;
+       const uint64_t eventfd_increment_value = 1;
 
        LTTNG_ASSERT(session->rotate_trigger);
        status = lttng_notification_channel_unsubscribe(
@@ -143,6 +154,15 @@ int unsubscribe_session_consumed_size_rotation(
                goto end;
        }
 
+       ret = lttng_write(rotate_notification_channel_subscription_change_eventfd,
+                         &eventfd_increment_value,
+                         sizeof(eventfd_increment_value));
+       if (ret != sizeof(eventfd_increment_value)) {
+               PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed");
+               ret = -1;
+               goto end;
+       }
+
        ret = notification_thread_command_unregister_trigger(notification_thread_handle,
                                                             session->rotate_trigger);
        if (ret != LTTNG_OK) {
index 4a0865eb8940a4faec5bb288ac61e50e1bca79ad..815b4e95f1a5644d78fd18282c4f8cd0b52e6887 100644 (file)
 
 #include <inttypes.h>
 #include <signal.h>
+#include <sys/eventfd.h>
 #include <sys/stat.h>
 #include <time.h>
 #include <urcu.h>
 #include <urcu/list.h>
 
 struct lttng_notification_channel *rotate_notification_channel = nullptr;
+/*
+ * This eventfd is used to wake-up the rotation thread whenever a command
+ * completes on the notification channel. This ensures that any notification
+ * that was queued while waiting for a reply to the command is eventually
+ * consumed.
+ */
+int rotate_notification_channel_subscription_change_eventfd = -1;
 
 struct rotation_thread {
        struct lttng_poll_event events;
@@ -277,6 +285,14 @@ static void fini_thread_state(struct rotation_thread *state)
        if (rotate_notification_channel) {
                lttng_notification_channel_destroy(rotate_notification_channel);
        }
+
+       if (rotate_notification_channel_subscription_change_eventfd >= 0) {
+               const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
+
+               if (close_ret) {
+                       PERROR("Failed to close rotation thread notification channel subscription change eventfd");
+               }
+       }
 }
 
 static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state)
@@ -305,6 +321,20 @@ static int init_thread_state(struct rotation_thread_handle *handle, struct rotat
                goto end;
        }
 
+       rotate_notification_channel_subscription_change_eventfd =
+               eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
+       if (rotate_notification_channel_subscription_change_eventfd < 0) {
+               PERROR("Failed to create rotation thread notification channel subscription change eventfd");
+               ret = -1;
+               goto end;
+       }
+       ret = lttng_poll_add(
+               &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
+       if (ret < 0) {
+               ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
+               goto end;
+       }
+
 end:
        return ret;
 }
@@ -714,50 +744,68 @@ static int handle_notification_channel(int fd __attribute__((unused)),
                                       struct rotation_thread *state __attribute__((unused)))
 {
        int ret;
-       bool notification_pending;
+       bool notification_pending = true;
        struct lttng_notification *notification = nullptr;
        enum lttng_notification_channel_status status;
 
-       status = lttng_notification_channel_has_pending_notification(rotate_notification_channel,
-                                                                    &notification_pending);
-       if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-               ERR("Error occurred while checking for pending notification");
-               ret = -1;
-               goto end;
-       }
+       /*
+        * A notification channel may have multiple notifications queued-up internally in
+        * its buffers. This is because a notification channel multiplexes command replies
+        * and notifications. The current protocol specifies that multiple notifications can be
+        * received before the reply to a command.
+        *
+        * In such cases, the notification channel client implementation internally queues them and
+        * provides them on the next calls to lttng_notification_channel_get_next_notification().
+        * This is correct with respect to the public API, which is intended to be used in "blocking
+        * mode".
+        *
+        * However, this internal user relies on poll/epoll to wake-up when data is available
+        * on the notification channel's socket. As such, it can't assume that a wake-up means only
+        * one notification is available for consumption since many of them may have been queued in
+        * the channel's internal buffers.
+        */
+       while (notification_pending) {
+               status = lttng_notification_channel_has_pending_notification(
+                       rotate_notification_channel, &notification_pending);
+               if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+                       ERR("Error occurred while checking for pending notification");
+                       ret = -1;
+                       goto end;
+               }
 
-       if (!notification_pending) {
-               ret = 0;
-               goto end;
-       }
+               if (!notification_pending) {
+                       ret = 0;
+                       goto end;
+               }
 
-       /* Receive the next notification. */
-       status = lttng_notification_channel_get_next_notification(rotate_notification_channel,
-                                                                 &notification);
+               /* Receive the next notification. */
+               status = lttng_notification_channel_get_next_notification(
+                       rotate_notification_channel, &notification);
+               switch (status) {
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
+                       break;
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
+                       WARN("Dropped notification detected on notification channel used by the rotation management thread.");
+                       ret = 0;
+                       goto end;
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
+                       ERR("Notification channel was closed");
+                       ret = -1;
+                       goto end;
+               default:
+                       /* Unhandled conditions / errors. */
+                       ERR("Unknown notification channel status");
+                       ret = -1;
+                       goto end;
+               }
 
-       switch (status) {
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
-               break;
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
-               /* Not an error, we will wait for the next one */
-               ret = 0;
-               goto end;
-               ;
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
-               ERR("Notification channel was closed");
-               ret = -1;
-               goto end;
-       default:
-               /* Unhandled conditions / errors. */
-               ERR("Unknown notification channel status");
-               ret = -1;
-               goto end;
+               ret = handle_condition(notification, handle->notification_thread_handle);
+               lttng_notification_destroy(notification);
+               if (ret) {
+                       goto end;
+               }
        }
-
-       ret = handle_condition(notification, handle->notification_thread_handle);
-
 end:
-       lttng_notification_destroy(notification);
        return ret;
 }
 
@@ -817,12 +865,23 @@ static void *thread_rotation(void *data)
                                goto error;
                        }
 
-                       if (fd == rotate_notification_channel->socket) {
+                       if (fd == rotate_notification_channel->socket ||
+                           fd == rotate_notification_channel_subscription_change_eventfd) {
                                ret = handle_notification_channel(fd, handle, &thread);
                                if (ret) {
                                        ERR("Error occurred while handling activity on notification channel socket");
                                        goto error;
                                }
+
+                               if (fd == rotate_notification_channel_subscription_change_eventfd) {
+                                       uint64_t eventfd_value;
+                                       const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
+
+                                       if (read_ret != sizeof(eventfd_value)) {
+                                               PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
+                                               goto error;
+                                       }
+                               }
                        } else {
                                /* Job queue or quit pipe activity. */
 
index d2bbf2966b9e33a06d6b3d7b153be833ffcf7b0a..6dc7c97ac9a8b65142f3cf5a59c168ba8f0877ac 100644 (file)
@@ -22,6 +22,7 @@
 #include "notification-thread.hpp"
 
 extern struct lttng_notification_channel *rotate_notification_channel;
+extern int rotate_notification_channel_subscription_change_eventfd;
 
 enum rotation_thread_job_type {
        ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,
This page took 0.032437 seconds and 4 git commands to generate.