Fix: sessiond: size-based notification occasionally not triggered
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
index 7b0936f48fe70ffcb4607642d38f6dbc1ca96a98..815b4e95f1a5644d78fd18282c4f8cd0b52e6887 100644 (file)
@@ -26,6 +26,7 @@
 #include <common/hashtable/utils.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
 #include <common/time.hpp>
+#include <common/urcu.hpp>
 #include <common/utils.hpp>
 
 #include <lttng/condition/condition-internal.hpp>
 
 #include <inttypes.h>
 #include <signal.h>
+#include <sys/eventfd.h>
 #include <sys/stat.h>
 #include <time.h>
 #include <urcu.h>
 #include <urcu/list.h>
 
-struct lttng_notification_channel *rotate_notification_channel = NULL;
+struct lttng_notification_channel *rotate_notification_channel = nullptr;
+/*
+ * This eventfd is used to wake-up the rotation thread whenever a command
+ * completes on the notification channel. This ensures that any notification
+ * that was queued while waiting for a reply to the command is eventually
+ * consumed.
+ */
+int rotate_notification_channel_subscription_change_eventfd = -1;
 
 struct rotation_thread {
        struct lttng_poll_event events;
@@ -87,9 +96,9 @@ static const char *get_job_type_str(enum rotation_thread_job_type job_type)
        }
 }
 
-struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void)
+struct rotation_thread_timer_queue *rotation_thread_timer_queue_create()
 {
-       struct rotation_thread_timer_queue *queue = NULL;
+       struct rotation_thread_timer_queue *queue = nullptr;
 
        queue = zmalloc<rotation_thread_timer_queue>();
        if (!queue) {
@@ -99,7 +108,7 @@ struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void)
 
        queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
        CDS_INIT_LIST_HEAD(&queue->list);
-       pthread_mutex_init(&queue->lock, NULL);
+       pthread_mutex_init(&queue->lock, nullptr);
 end:
        return queue;
 }
@@ -150,7 +159,7 @@ end:
        return handle;
 error:
        rotation_thread_handle_destroy(handle);
-       return NULL;
+       return nullptr;
 }
 
 /*
@@ -180,7 +189,7 @@ void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
 {
        int ret;
        const char dummy = '!';
-       struct rotation_thread_job *job = NULL;
+       struct rotation_thread_job *job = nullptr;
        const char *job_type_str = get_job_type_str(job_type);
 
        pthread_mutex_lock(&queue->lock);
@@ -276,6 +285,14 @@ static void fini_thread_state(struct rotation_thread *state)
        if (rotate_notification_channel) {
                lttng_notification_channel_destroy(rotate_notification_channel);
        }
+
+       if (rotate_notification_channel_subscription_change_eventfd >= 0) {
+               const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
+
+               if (close_ret) {
+                       PERROR("Failed to close rotation thread notification channel subscription change eventfd");
+               }
+       }
 }
 
 static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state)
@@ -304,6 +321,20 @@ static int init_thread_state(struct rotation_thread_handle *handle, struct rotat
                goto end;
        }
 
+       rotate_notification_channel_subscription_change_eventfd =
+               eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
+       if (rotate_notification_channel_subscription_change_eventfd < 0) {
+               PERROR("Failed to create rotation thread notification channel subscription change eventfd");
+               ret = -1;
+               goto end;
+       }
+       ret = lttng_poll_add(
+               &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
+       if (ret < 0) {
+               ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
+               goto end;
+       }
+
 end:
        return ret;
 }
@@ -318,6 +349,7 @@ static void check_session_rotation_pending_on_consumers(struct ltt_session *sess
        uint64_t relayd_id;
        bool chunk_exists_on_peer = false;
        enum lttng_trace_chunk_status chunk_status;
+       lttng::urcu::read_lock_guard read_lock;
 
        LTTNG_ASSERT(session->chunk_being_archived);
 
@@ -325,10 +357,10 @@ static void check_session_rotation_pending_on_consumers(struct ltt_session *sess
         * Check for a local pending rotation on all consumers (32-bit
         * user space, 64-bit user space, and kernel).
         */
-       rcu_read_lock();
        if (!session->ust_session) {
                goto skip_ust;
        }
+
        cds_lfht_for_each_entry (
                session->ust_session->consumer->socks->ht, &iter, socket, node.node) {
                relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
@@ -386,7 +418,6 @@ skip_ust:
        }
 skip_kernel:
 end:
-       rcu_read_unlock();
 
        if (!chunk_exists_on_peer) {
                uint64_t chunk_being_archived_id;
@@ -460,7 +491,7 @@ check_session_rotation_pending(struct ltt_session *session,
         * rotations can start now.
         */
        chunk_status = lttng_trace_chunk_get_name(
-               session->chunk_being_archived, &archived_chunk_name, NULL);
+               session->chunk_being_archived, &archived_chunk_name, nullptr);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
        free(session->last_archived_chunk_name);
        session->last_archived_chunk_name = strdup(archived_chunk_name);
@@ -609,7 +640,7 @@ static int handle_condition(const struct lttng_notification *notification,
                            struct notification_thread_handle *notification_thread_handle)
 {
        int ret = 0;
-       const char *condition_session_name = NULL;
+       const char *condition_session_name = nullptr;
        enum lttng_condition_type condition_type;
        enum lttng_condition_status condition_status;
        enum lttng_evaluation_status evaluation_status;
@@ -673,7 +704,7 @@ static int handle_condition(const struct lttng_notification *notification,
        }
 
        ret = cmd_rotate_session(
-               session, NULL, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+               session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
        switch (ret) {
        case LTTNG_OK:
                break;
@@ -713,50 +744,68 @@ static int handle_notification_channel(int fd __attribute__((unused)),
                                       struct rotation_thread *state __attribute__((unused)))
 {
        int ret;
-       bool notification_pending;
-       struct lttng_notification *notification = NULL;
+       bool notification_pending = true;
+       struct lttng_notification *notification = nullptr;
        enum lttng_notification_channel_status status;
 
-       status = lttng_notification_channel_has_pending_notification(rotate_notification_channel,
-                                                                    &notification_pending);
-       if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-               ERR("Error occurred while checking for pending notification");
-               ret = -1;
-               goto end;
-       }
+       /*
+        * A notification channel may have multiple notifications queued-up internally in
+        * its buffers. This is because a notification channel multiplexes command replies
+        * and notifications. The current protocol specifies that multiple notifications can be
+        * received before the reply to a command.
+        *
+        * In such cases, the notification channel client implementation internally queues them and
+        * provides them on the next calls to lttng_notification_channel_get_next_notification().
+        * This is correct with respect to the public API, which is intended to be used in "blocking
+        * mode".
+        *
+        * However, this internal user relies on poll/epoll to wake-up when data is available
+        * on the notification channel's socket. As such, it can't assume that a wake-up means only
+        * one notification is available for consumption since many of them may have been queued in
+        * the channel's internal buffers.
+        */
+       while (notification_pending) {
+               status = lttng_notification_channel_has_pending_notification(
+                       rotate_notification_channel, &notification_pending);
+               if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+                       ERR("Error occurred while checking for pending notification");
+                       ret = -1;
+                       goto end;
+               }
 
-       if (!notification_pending) {
-               ret = 0;
-               goto end;
-       }
+               if (!notification_pending) {
+                       ret = 0;
+                       goto end;
+               }
 
-       /* Receive the next notification. */
-       status = lttng_notification_channel_get_next_notification(rotate_notification_channel,
-                                                                 &notification);
+               /* Receive the next notification. */
+               status = lttng_notification_channel_get_next_notification(
+                       rotate_notification_channel, &notification);
+               switch (status) {
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
+                       break;
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
+                       WARN("Dropped notification detected on notification channel used by the rotation management thread.");
+                       ret = 0;
+                       goto end;
+               case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
+                       ERR("Notification channel was closed");
+                       ret = -1;
+                       goto end;
+               default:
+                       /* Unhandled conditions / errors. */
+                       ERR("Unknown notification channel status");
+                       ret = -1;
+                       goto end;
+               }
 
-       switch (status) {
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
-               break;
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
-               /* Not an error, we will wait for the next one */
-               ret = 0;
-               goto end;
-               ;
-       case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
-               ERR("Notification channel was closed");
-               ret = -1;
-               goto end;
-       default:
-               /* Unhandled conditions / errors. */
-               ERR("Unknown notification channel status");
-               ret = -1;
-               goto end;
+               ret = handle_condition(notification, handle->notification_thread_handle);
+               lttng_notification_destroy(notification);
+               if (ret) {
+                       goto end;
+               }
        }
-
-       ret = handle_condition(notification, handle->notification_thread_handle);
-
 end:
-       lttng_notification_destroy(notification);
        return ret;
 }
 
@@ -816,12 +865,23 @@ static void *thread_rotation(void *data)
                                goto error;
                        }
 
-                       if (fd == rotate_notification_channel->socket) {
+                       if (fd == rotate_notification_channel->socket ||
+                           fd == rotate_notification_channel_subscription_change_eventfd) {
                                ret = handle_notification_channel(fd, handle, &thread);
                                if (ret) {
                                        ERR("Error occurred while handling activity on notification channel socket");
                                        goto error;
                                }
+
+                               if (fd == rotate_notification_channel_subscription_change_eventfd) {
+                                       uint64_t eventfd_value;
+                                       const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
+
+                                       if (read_ret != sizeof(eventfd_value)) {
+                                               PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
+                                               goto error;
+                                       }
+                               }
                        } else {
                                /* Job queue or quit pipe activity. */
 
@@ -862,7 +922,7 @@ end:
        health_unregister(the_health_sessiond);
        rcu_thread_offline();
        rcu_unregister_thread();
-       return NULL;
+       return nullptr;
 }
 
 static bool shutdown_rotation_thread(void *thread_data)
@@ -878,7 +938,7 @@ bool launch_rotation_thread(struct rotation_thread_handle *handle)
        struct lttng_thread *thread;
 
        thread = lttng_thread_create(
-               "Rotation", thread_rotation, shutdown_rotation_thread, NULL, handle);
+               "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle);
        if (!thread) {
                goto error;
        }
This page took 0.028487 seconds and 4 git commands to generate.