int ret;
enum lttng_condition_status condition_status;
enum lttng_notification_channel_status nc_status;
+ const uint64_t eventfd_increment_value = 1;
struct lttng_condition *rotate_condition = nullptr;
struct lttng_action *notify_action = nullptr;
const struct lttng_credentials session_creds = {
goto end;
}
+ ret = lttng_write(rotate_notification_channel_subscription_change_eventfd,
+ &eventfd_increment_value,
+ sizeof(eventfd_increment_value));
+ if (ret != sizeof(eventfd_increment_value)) {
+ PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed");
+ ret = -1;
+ goto end;
+ }
+
ret = notification_thread_command_register_trigger(
notification_thread_handle, session->rotate_trigger, true);
if (ret < 0 && ret != -LTTNG_ERR_TRIGGER_EXISTS) {
{
int ret = 0;
enum lttng_notification_channel_status status;
+ const uint64_t eventfd_increment_value = 1;
LTTNG_ASSERT(session->rotate_trigger);
status = lttng_notification_channel_unsubscribe(
goto end;
}
+ ret = lttng_write(rotate_notification_channel_subscription_change_eventfd,
+ &eventfd_increment_value,
+ sizeof(eventfd_increment_value));
+ if (ret != sizeof(eventfd_increment_value)) {
+ PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed");
+ ret = -1;
+ goto end;
+ }
+
ret = notification_thread_command_unregister_trigger(notification_thread_handle,
session->rotate_trigger);
if (ret != LTTNG_OK) {
#include <inttypes.h>
#include <signal.h>
+#include <sys/eventfd.h>
#include <sys/stat.h>
#include <time.h>
#include <urcu.h>
#include <urcu/list.h>
struct lttng_notification_channel *rotate_notification_channel = nullptr;
+/*
+ * This eventfd is used to wake-up the rotation thread whenever a command
+ * completes on the notification channel. This ensures that any notification
+ * that was queued while waiting for a reply to the command is eventually
+ * consumed.
+ */
+int rotate_notification_channel_subscription_change_eventfd = -1;
struct rotation_thread {
struct lttng_poll_event events;
if (rotate_notification_channel) {
lttng_notification_channel_destroy(rotate_notification_channel);
}
+
+ if (rotate_notification_channel_subscription_change_eventfd >= 0) {
+ const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
+
+ if (close_ret) {
+ PERROR("Failed to close rotation thread notification channel subscription change eventfd");
+ }
+ }
}
static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state)
goto end;
}
+ rotate_notification_channel_subscription_change_eventfd =
+ eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
+ if (rotate_notification_channel_subscription_change_eventfd < 0) {
+ PERROR("Failed to create rotation thread notification channel subscription change eventfd");
+ ret = -1;
+ goto end;
+ }
+ ret = lttng_poll_add(
+ &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
+ if (ret < 0) {
+ ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
+ goto end;
+ }
+
end:
return ret;
}
struct rotation_thread *state __attribute__((unused)))
{
int ret;
- bool notification_pending;
+ bool notification_pending = true;
struct lttng_notification *notification = nullptr;
enum lttng_notification_channel_status status;
- status = lttng_notification_channel_has_pending_notification(rotate_notification_channel,
- ¬ification_pending);
- if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
- ERR("Error occurred while checking for pending notification");
- ret = -1;
- goto end;
- }
+ /*
+ * A notification channel may have multiple notifications queued-up internally in
+ * its buffers. This is because a notification channel multiplexes command replies
+ * and notifications. The current protocol specifies that multiple notifications can be
+ * received before the reply to a command.
+ *
+ * In such cases, the notification channel client implementation internally queues them and
+ * provides them on the next calls to lttng_notification_channel_get_next_notification().
+ * This is correct with respect to the public API, which is intended to be used in "blocking
+ * mode".
+ *
+ * However, this internal user relies on poll/epoll to wake-up when data is available
+ * on the notification channel's socket. As such, it can't assume that a wake-up means only
+ * one notification is available for consumption since many of them may have been queued in
+ * the channel's internal buffers.
+ */
+ while (notification_pending) {
+ status = lttng_notification_channel_has_pending_notification(
+ rotate_notification_channel, ¬ification_pending);
+ if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+ ERR("Error occurred while checking for pending notification");
+ ret = -1;
+ goto end;
+ }
- if (!notification_pending) {
- ret = 0;
- goto end;
- }
+ if (!notification_pending) {
+ ret = 0;
+ goto end;
+ }
- /* Receive the next notification. */
- status = lttng_notification_channel_get_next_notification(rotate_notification_channel,
- ¬ification);
+ /* Receive the next notification. */
+ status = lttng_notification_channel_get_next_notification(
+ rotate_notification_channel, ¬ification);
+ switch (status) {
+ case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
+ break;
+ case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
+ WARN("Dropped notification detected on notification channel used by the rotation management thread.");
+ ret = 0;
+ goto end;
+ case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
+ ERR("Notification channel was closed");
+ ret = -1;
+ goto end;
+ default:
+ /* Unhandled conditions / errors. */
+ ERR("Unknown notification channel status");
+ ret = -1;
+ goto end;
+ }
- switch (status) {
- case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
- break;
- case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
- /* Not an error, we will wait for the next one */
- ret = 0;
- goto end;
- ;
- case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
- ERR("Notification channel was closed");
- ret = -1;
- goto end;
- default:
- /* Unhandled conditions / errors. */
- ERR("Unknown notification channel status");
- ret = -1;
- goto end;
+ ret = handle_condition(notification, handle->notification_thread_handle);
+ lttng_notification_destroy(notification);
+ if (ret) {
+ goto end;
+ }
}
-
- ret = handle_condition(notification, handle->notification_thread_handle);
-
end:
- lttng_notification_destroy(notification);
return ret;
}
goto error;
}
- if (fd == rotate_notification_channel->socket) {
+ if (fd == rotate_notification_channel->socket ||
+ fd == rotate_notification_channel_subscription_change_eventfd) {
ret = handle_notification_channel(fd, handle, &thread);
if (ret) {
ERR("Error occurred while handling activity on notification channel socket");
goto error;
}
+
+ if (fd == rotate_notification_channel_subscription_change_eventfd) {
+ uint64_t eventfd_value;
+ const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
+
+ if (read_ret != sizeof(eventfd_value)) {
+ PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
+ goto error;
+ }
+ }
} else {
/* Job queue or quit pipe activity. */