From: Jérémie Galarneau Date: Tue, 4 Apr 2023 15:03:03 +0000 (-0400) Subject: Fix: sessiond: size-based notification occasionally not triggered X-Git-Url: https://git.liburcu.org/?a=commitdiff_plain;ds=sidebyside;h=dc65dda314fcd676fabfe73942c34cb93b7fea40;p=lttng-tools.git Fix: sessiond: size-based notification occasionally not triggered Issue observed ============== When tracing to multiple sessions with scheduled size-based rotations that occur simultaneously (typically because they trace the same events and use the same rotation schedule configuration), the start of some rotations seems to be delayed indefinitely. Cause ===== The size-based rotations are implemented by piggy-backing onto the channel monitoring facilities. Essentially, a per-channel timer samples a number of statistics on the consumer daemon end, which transmits them to the session daemon. The session daemon's notification subsystem evaluates the statistics against the various registered triggers bound to the channels being monitored when a statistics sample is received. To implement size-based rotations, internal triggers are registered with the "consumed size" condition set to a given threshold. A session rotation management thread (which also performs other tasks) uses a notification channel to wait for sessions to reach their target size, starts rotations as needed, and sets a new threshold according to the sessions' configured rotation schedule. The rotation thread uses liblttng-ctl's API to consume notifications from a notification channel. At any time, a notification channel may have multiple notifications queued-up internally in its buffers. This is because a notification channel multiplexes command replies and notifications over the same UNIX socket. The current protocol specifies that multiple notifications can be received before the reply to a command. In such cases, the notification channel client implementation internally queues them and provides them on the next calls to lttng_notification_channel_get_next_notification(). This is correct with respect to the public API, which is intended to be used in "blocking mode". However, this internal user uses the notification channel's raw file descriptor to wake-up when a notification is available. This is problematic because notifications may be queued by the notification channel (and thus removed from the socket) while waiting for command replies (subscribing and unsubscribing from notification conditions). In such a case, a notification is available but the rotation thread does not wake-up to consume it as nothing is available in the socket's buffer. When this happens, a session that is supposed to rotate automatically appears to grow indefinitely. It will typically eventually rotate as new notifications become available and cause the rotation thread to wake-up. However, a "lag" builds up as the notification that caused the wake-up is not consumed. Instead, the last buffered notification is provided to the rotation thread. Solution ======== Use an event_fd to wake-up the rotation thread whenever a command completes on the notification channel. This ensures that any notification that was queued while waiting for a reply to the command is eventually consumed. Known drawbacks =============== None. Note ==== The use of C++ features is kept to a minimum in this patch in order to make it easier to backport to the stable releases. A clean-up patch follows and makes the code conform to the coding standards. Signed-off-by: Jérémie Galarneau Change-Id: I8974b10124704d1e66e8da32d495fee738e3d43f --- diff --git a/src/bin/lttng-sessiond/rotate.cpp b/src/bin/lttng-sessiond/rotate.cpp index cc8575443..56a61fe2a 100644 --- a/src/bin/lttng-sessiond/rotate.cpp +++ b/src/bin/lttng-sessiond/rotate.cpp @@ -49,6 +49,7 @@ int subscribe_session_consumed_size_rotation( int ret; enum lttng_condition_status condition_status; enum lttng_notification_channel_status nc_status; + const uint64_t eventfd_increment_value = 1; struct lttng_condition *rotate_condition = nullptr; struct lttng_action *notify_action = nullptr; const struct lttng_credentials session_creds = { @@ -108,6 +109,15 @@ int subscribe_session_consumed_size_rotation( goto end; } + ret = lttng_write(rotate_notification_channel_subscription_change_eventfd, + &eventfd_increment_value, + sizeof(eventfd_increment_value)); + if (ret != sizeof(eventfd_increment_value)) { + PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed"); + ret = -1; + goto end; + } + ret = notification_thread_command_register_trigger( notification_thread_handle, session->rotate_trigger, true); if (ret < 0 && ret != -LTTNG_ERR_TRIGGER_EXISTS) { @@ -132,6 +142,7 @@ int unsubscribe_session_consumed_size_rotation( { int ret = 0; enum lttng_notification_channel_status status; + const uint64_t eventfd_increment_value = 1; LTTNG_ASSERT(session->rotate_trigger); status = lttng_notification_channel_unsubscribe( @@ -143,6 +154,15 @@ int unsubscribe_session_consumed_size_rotation( goto end; } + ret = lttng_write(rotate_notification_channel_subscription_change_eventfd, + &eventfd_increment_value, + sizeof(eventfd_increment_value)); + if (ret != sizeof(eventfd_increment_value)) { + PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed"); + ret = -1; + goto end; + } + ret = notification_thread_command_unregister_trigger(notification_thread_handle, session->rotate_trigger); if (ret != LTTNG_OK) { diff --git a/src/bin/lttng-sessiond/rotation-thread.cpp b/src/bin/lttng-sessiond/rotation-thread.cpp index 4a0865eb8..815b4e95f 100644 --- a/src/bin/lttng-sessiond/rotation-thread.cpp +++ b/src/bin/lttng-sessiond/rotation-thread.cpp @@ -38,12 +38,20 @@ #include #include +#include #include #include #include #include struct lttng_notification_channel *rotate_notification_channel = nullptr; +/* + * This eventfd is used to wake-up the rotation thread whenever a command + * completes on the notification channel. This ensures that any notification + * that was queued while waiting for a reply to the command is eventually + * consumed. + */ +int rotate_notification_channel_subscription_change_eventfd = -1; struct rotation_thread { struct lttng_poll_event events; @@ -277,6 +285,14 @@ static void fini_thread_state(struct rotation_thread *state) if (rotate_notification_channel) { lttng_notification_channel_destroy(rotate_notification_channel); } + + if (rotate_notification_channel_subscription_change_eventfd >= 0) { + const int close_ret = close(rotate_notification_channel_subscription_change_eventfd); + + if (close_ret) { + PERROR("Failed to close rotation thread notification channel subscription change eventfd"); + } + } } static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state) @@ -305,6 +321,20 @@ static int init_thread_state(struct rotation_thread_handle *handle, struct rotat goto end; } + rotate_notification_channel_subscription_change_eventfd = + eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE); + if (rotate_notification_channel_subscription_change_eventfd < 0) { + PERROR("Failed to create rotation thread notification channel subscription change eventfd"); + ret = -1; + goto end; + } + ret = lttng_poll_add( + &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN); + if (ret < 0) { + ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset"); + goto end; + } + end: return ret; } @@ -714,50 +744,68 @@ static int handle_notification_channel(int fd __attribute__((unused)), struct rotation_thread *state __attribute__((unused))) { int ret; - bool notification_pending; + bool notification_pending = true; struct lttng_notification *notification = nullptr; enum lttng_notification_channel_status status; - status = lttng_notification_channel_has_pending_notification(rotate_notification_channel, - ¬ification_pending); - if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { - ERR("Error occurred while checking for pending notification"); - ret = -1; - goto end; - } + /* + * A notification channel may have multiple notifications queued-up internally in + * its buffers. This is because a notification channel multiplexes command replies + * and notifications. The current protocol specifies that multiple notifications can be + * received before the reply to a command. + * + * In such cases, the notification channel client implementation internally queues them and + * provides them on the next calls to lttng_notification_channel_get_next_notification(). + * This is correct with respect to the public API, which is intended to be used in "blocking + * mode". + * + * However, this internal user relies on poll/epoll to wake-up when data is available + * on the notification channel's socket. As such, it can't assume that a wake-up means only + * one notification is available for consumption since many of them may have been queued in + * the channel's internal buffers. + */ + while (notification_pending) { + status = lttng_notification_channel_has_pending_notification( + rotate_notification_channel, ¬ification_pending); + if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { + ERR("Error occurred while checking for pending notification"); + ret = -1; + goto end; + } - if (!notification_pending) { - ret = 0; - goto end; - } + if (!notification_pending) { + ret = 0; + goto end; + } - /* Receive the next notification. */ - status = lttng_notification_channel_get_next_notification(rotate_notification_channel, - ¬ification); + /* Receive the next notification. */ + status = lttng_notification_channel_get_next_notification( + rotate_notification_channel, ¬ification); + switch (status) { + case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK: + break; + case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED: + WARN("Dropped notification detected on notification channel used by the rotation management thread."); + ret = 0; + goto end; + case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED: + ERR("Notification channel was closed"); + ret = -1; + goto end; + default: + /* Unhandled conditions / errors. */ + ERR("Unknown notification channel status"); + ret = -1; + goto end; + } - switch (status) { - case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK: - break; - case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED: - /* Not an error, we will wait for the next one */ - ret = 0; - goto end; - ; - case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED: - ERR("Notification channel was closed"); - ret = -1; - goto end; - default: - /* Unhandled conditions / errors. */ - ERR("Unknown notification channel status"); - ret = -1; - goto end; + ret = handle_condition(notification, handle->notification_thread_handle); + lttng_notification_destroy(notification); + if (ret) { + goto end; + } } - - ret = handle_condition(notification, handle->notification_thread_handle); - end: - lttng_notification_destroy(notification); return ret; } @@ -817,12 +865,23 @@ static void *thread_rotation(void *data) goto error; } - if (fd == rotate_notification_channel->socket) { + if (fd == rotate_notification_channel->socket || + fd == rotate_notification_channel_subscription_change_eventfd) { ret = handle_notification_channel(fd, handle, &thread); if (ret) { ERR("Error occurred while handling activity on notification channel socket"); goto error; } + + if (fd == rotate_notification_channel_subscription_change_eventfd) { + uint64_t eventfd_value; + const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value)); + + if (read_ret != sizeof(eventfd_value)) { + PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd"); + goto error; + } + } } else { /* Job queue or quit pipe activity. */ diff --git a/src/bin/lttng-sessiond/rotation-thread.hpp b/src/bin/lttng-sessiond/rotation-thread.hpp index d2bbf2966..6dc7c97ac 100644 --- a/src/bin/lttng-sessiond/rotation-thread.hpp +++ b/src/bin/lttng-sessiond/rotation-thread.hpp @@ -22,6 +22,7 @@ #include "notification-thread.hpp" extern struct lttng_notification_channel *rotate_notification_channel; +extern int rotate_notification_channel_subscription_change_eventfd; enum rotation_thread_job_type { ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,