Standardize quit pipes behavior
authorMichael Jeanson <mjeanson@efficios.com>
Tue, 10 Nov 2020 21:33:36 +0000 (16:33 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 27 Oct 2022 20:31:38 +0000 (16:31 -0400)
Standardize the behavior of the quit pipes to trigger on any poll events.

Change-Id: I0beeefcbd1a55b2aa308eb28b617487ffdeb737e
Signed-off-by: Michael Jeanson <mjeanson@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
16 files changed:
src/bin/lttng-consumerd/health-consumerd.cpp
src/bin/lttng-relayd/Makefile.am
src/bin/lttng-relayd/health-relayd.cpp
src/bin/lttng-relayd/live.cpp
src/bin/lttng-relayd/lttng-relayd.hpp
src/bin/lttng-relayd/main.cpp
src/bin/lttng-relayd/thread-utils.cpp [new file with mode: 0644]
src/bin/lttng-sessiond/agent-thread.cpp
src/bin/lttng-sessiond/client.cpp
src/bin/lttng-sessiond/health.cpp
src/bin/lttng-sessiond/manage-apps.cpp
src/bin/lttng-sessiond/manage-consumer.cpp
src/bin/lttng-sessiond/manage-kernel.cpp
src/bin/lttng-sessiond/notify-apps.cpp
src/bin/lttng-sessiond/register.cpp
src/bin/lttng-sessiond/utils.cpp

index 8417fc6dccfca4122561edeadb4b2d623dc9f029..f8be972c595986fae6c4aca3b3363a59dc39dcf3 100644 (file)
 /* Global health check unix path */
 static char health_unix_sock_path[PATH_MAX];
 
-int health_quit_pipe[2];
-
-/*
- * Check if the thread quit pipe was triggered.
- *
- * Return 1 if it was triggered else 0;
- */
-static
-int check_health_quit_pipe(int fd, uint32_t events)
-{
-       if (fd == health_quit_pipe[0] && (events & LPOLLIN)) {
-               return 1;
-       }
-
-       return 0;
-}
+int health_quit_pipe[2] = {-1, -1};
 
 /*
  * Send data on a unix socket using the liblttsessiondcomm API.
@@ -146,8 +131,8 @@ end:
  */
 void *thread_manage_health_consumerd(void *data __attribute__((unused)))
 {
-       int sock = -1, new_sock = -1, ret, i, pollfd, err = -1;
-       uint32_t revents, nb_fd;
+       int sock = -1, new_sock = -1, ret, i, err = -1;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        struct health_comm_msg msg;
        struct health_comm_reply reply;
@@ -252,12 +237,12 @@ restart:
 
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_health_quit_pipe(pollfd, revents);
-                       if (ret) {
+                       /* Activity on health quit pipe, exiting. */
+                       if (pollfd == health_quit_pipe[0]) {
+                               DBG("Activity on health quit pipe");
                                err = 0;
                                goto exit;
                        }
index b473d5c62b29463eb449bc7ee096ae9a96ab672b..fce8c2346b8e9b77ea962be41540b4c83e87f7be 100644 (file)
@@ -25,7 +25,8 @@ lttng_relayd_SOURCES = main.cpp lttng-relayd.hpp utils.hpp utils.cpp cmd.hpp \
                        tracefile-array.cpp tracefile-array.hpp \
                        tcp_keep_alive.cpp tcp_keep_alive.hpp \
                        sessiond-trace-chunks.cpp sessiond-trace-chunks.hpp \
-                       backward-compatibility-group-by.cpp backward-compatibility-group-by.hpp
+                       backward-compatibility-group-by.cpp backward-compatibility-group-by.hpp \
+                       thread-utils.cpp
 
 # link on liblttngctl for check if relayd is already alive.
 lttng_relayd_LDADD = $(URCU_LIBS) \
index 8e22dfe66335f1332ff8740e6e64ee0869480411..af8487bdb838dcc8490b4e4fde020993a2d5928f 100644 (file)
@@ -47,21 +47,6 @@ char health_unix_sock_path[PATH_MAX];
 
 int health_quit_pipe[2] = { -1, -1 };
 
-/*
- * Check if the thread quit pipe was triggered.
- *
- * Return 1 if it was triggered else 0;
- */
-static
-int check_health_quit_pipe(int fd, uint32_t events)
-{
-       if (fd == health_quit_pipe[0] && (events & LPOLLIN)) {
-               return 1;
-       }
-
-       return 0;
-}
-
 /*
  * Send data on a unix socket using the liblttsessiondcomm API.
  *
@@ -261,8 +246,8 @@ end:
  */
 void *thread_manage_health_relayd(void *data __attribute__((unused)))
 {
-       int sock = -1, new_sock = -1, ret, i, pollfd, err = -1;
-       uint32_t revents, nb_fd;
+       int sock = -1, new_sock = -1, ret, i, err = -1;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        struct health_comm_msg msg;
        struct health_comm_reply reply;
@@ -379,12 +364,12 @@ restart:
 
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_health_quit_pipe(pollfd, revents);
-                       if (ret) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (pollfd == health_quit_pipe[0]) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
index abbec8c20e89e02253a0dce83a38a138909b9dff..4724a3adaab0bcbfe5d64dfc44245b57c0f202da 100644 (file)
@@ -590,54 +590,6 @@ int relayd_live_stop(void)
        return 0;
 }
 
-/*
- * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
- */
-static
-int create_named_thread_poll_set(struct lttng_poll_event *events,
-               int size, const char *name)
-{
-       int ret;
-
-       if (events == NULL || size == 0) {
-               ret = -1;
-               goto error;
-       }
-
-       ret = fd_tracker_util_poll_create(the_fd_tracker,
-                       name, events, 1, LTTNG_CLOEXEC);
-       if (ret) {
-               PERROR("Failed to create \"%s\" poll file descriptor", name);
-               goto error;
-       }
-
-       /* Add quit pipe */
-       ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
-       if (ret < 0) {
-               goto error;
-       }
-
-       return 0;
-
-error:
-       return ret;
-}
-
-/*
- * Check if the thread quit pipe was triggered.
- *
- * Return 1 if it was triggered else 0;
- */
-static
-int check_thread_quit_pipe(int fd, uint32_t events)
-{
-       if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
-               return 1;
-       }
-
-       return 0;
-}
-
 static
 int create_sock(void *data, int *out_fd)
 {
@@ -768,8 +720,8 @@ error:
 static
 void *thread_listener(void *data __attribute__((unused)))
 {
-       int i, ret, pollfd, err = -1;
-       uint32_t revents, nb_fd;
+       int i, ret, err = -1;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttcomm_sock *live_control_sock;
 
@@ -826,15 +778,15 @@ restart:
 
                DBG("Relay new viewer connection received");
                for (i = 0; i < nb_fd; i++) {
-                       health_code_update();
-
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                       health_code_update();
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
-                       if (ret) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (relayd_is_thread_quit_pipe(pollfd)) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
@@ -2729,14 +2681,14 @@ restart:
                 */
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       uint32_t revents = LTTNG_POLL_GETEV(&events, i);
-                       int pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        health_code_update();
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
-                       if (ret) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (relayd_is_thread_quit_pipe(pollfd)) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
index a92ace77d34c19289e0fadb676f0900b4a96862a..32911f09c5ef54fd69cfdfe2ab4ae83524f049f9 100644 (file)
@@ -14,6 +14,7 @@
 #include <urcu.h>
 #include <urcu/wfcqueue.h>
 
+#include <common/compat/poll.hpp>
 #include <common/hashtable/hashtable.hpp>
 #include <common/fd-tracker/fd-tracker.hpp>
 
@@ -49,11 +50,17 @@ extern const char *tracing_group_name;
 extern const char * const config_section_name;
 extern enum relay_group_output_by opt_group_output_by;
 
-extern int thread_quit_pipe[2];
-
 extern struct fd_tracker *the_fd_tracker;
 
 void lttng_relay_notify_ready(void);
 int lttng_relay_stop_threads(void);
 
+int relayd_init_thread_quit_pipe(void);
+int relayd_notify_thread_quit_pipe(void);
+void relayd_close_thread_quit_pipe(void);
+bool relayd_is_thread_quit_pipe(const int fd);
+
+int create_named_thread_poll_set(struct lttng_poll_event *events,
+               int size, const char *name);
+
 #endif /* LTTNG_RELAYD_H */
index 318af66f787526f049faef36f9c4cf1a78425c71..528d1451cf769feea1a694550b5220d20f7d852d 100644 (file)
@@ -126,12 +126,6 @@ static int tracing_group_name_override;
 
 const char * const config_section_name = "relayd";
 
-/*
- * Quit pipe for all threads. This permits a single cancellation point
- * for all threads when receiving an event on the pipe.
- */
-int thread_quit_pipe[2] = { -1, -1 };
-
 /*
  * This pipe is used to inform the worker thread that a command is queued and
  * ready to be processed.
@@ -722,10 +716,7 @@ static void relayd_cleanup(void)
                (void) fd_tracker_util_pipe_close(
                                the_fd_tracker, health_quit_pipe);
        }
-       if (thread_quit_pipe[0] != -1) {
-               (void) fd_tracker_util_pipe_close(
-                               the_fd_tracker, thread_quit_pipe);
-       }
+       relayd_close_thread_quit_pipe();
        if (sessiond_trace_chunk_registry) {
                sessiond_trace_chunk_registry_destroy(
                                sessiond_trace_chunk_registry);
@@ -748,23 +739,6 @@ static void relayd_cleanup(void)
        }
 }
 
-/*
- * Write to writable pipe used to notify a thread.
- */
-static int notify_thread_pipe(int wpipe)
-{
-       ssize_t ret;
-
-       ret = lttng_write(wpipe, "!", 1);
-       if (ret < 1) {
-               PERROR("write poll pipe");
-               goto end;
-       }
-       ret = 0;
-end:
-       return ret;
-}
-
 static int notify_health_quit_pipe(int *pipe)
 {
        ssize_t ret;
@@ -788,7 +762,7 @@ int lttng_relay_stop_threads(void)
 
        /* Stopping all threads */
        DBG("Terminating all threads");
-       if (notify_thread_pipe(thread_quit_pipe[1])) {
+       if (relayd_notify_thread_quit_pipe()) {
                ERR("write error on thread quit pipe");
                retval = -1;
        }
@@ -892,17 +866,6 @@ void lttng_relay_notify_ready(void)
        }
 }
 
-/*
- * Init thread quit pipe.
- *
- * Return -1 on error or 0 if all pipes are created.
- */
-static int init_thread_quit_pipe(void)
-{
-       return fd_tracker_util_pipe_open_cloexec(
-                       the_fd_tracker, "Quit pipe", thread_quit_pipe);
-}
-
 /*
  * Init health quit pipe.
  *
@@ -914,52 +877,6 @@ static int init_health_quit_pipe(void)
                        "Health quit pipe", health_quit_pipe);
 }
 
-/*
- * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
- */
-static int create_named_thread_poll_set(struct lttng_poll_event *events,
-               int size, const char *name)
-{
-       int ret;
-
-       if (events == NULL || size == 0) {
-               ret = -1;
-               goto error;
-       }
-
-       ret = fd_tracker_util_poll_create(the_fd_tracker,
-                       name, events, 1, LTTNG_CLOEXEC);
-       if (ret) {
-               PERROR("Failed to create \"%s\" poll file descriptor", name);
-               goto error;
-       }
-
-       /* Add quit pipe */
-       ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
-       if (ret < 0) {
-               goto error;
-       }
-
-       return 0;
-
-error:
-       return ret;
-}
-
-/*
- * Check if the thread quit pipe was triggered.
- *
- * Return 1 if it was triggered else 0;
- */
-static int check_thread_quit_pipe(int fd, uint32_t events)
-{
-       if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
-               return 1;
-       }
-
-       return 0;
-}
-
 static int create_sock(void *data, int *out_fd)
 {
        int ret;
@@ -1089,8 +1006,8 @@ end:
  */
 static void *relay_thread_listener(void *data __attribute__((unused)))
 {
-       int i, ret, pollfd, err = -1;
-       uint32_t revents, nb_fd;
+       int i, ret, err = -1;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttcomm_sock *control_sock, *data_sock;
 
@@ -1161,15 +1078,15 @@ restart:
 
                DBG("Relay new connection received");
                for (i = 0; i < nb_fd; i++) {
-                       health_code_update();
-
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
-                       if (ret) {
+                       health_code_update();
+
+                       /* Activity on thread quit pipe, exiting. */
+                       if (relayd_is_thread_quit_pipe(pollfd)) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
@@ -3978,14 +3895,14 @@ restart:
                 */
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       uint32_t revents = LTTNG_POLL_GETEV(&events, i);
-                       int pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        health_code_update();
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
-                       if (ret) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (relayd_is_thread_quit_pipe(pollfd)) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
@@ -4391,7 +4308,7 @@ int main(int argc, char **argv)
        }
 
        /* Create thread quit pipe */
-       if (init_thread_quit_pipe()) {
+       if (relayd_init_thread_quit_pipe()) {
                retval = -1;
                goto exit_options;
        }
diff --git a/src/bin/lttng-relayd/thread-utils.cpp b/src/bin/lttng-relayd/thread-utils.cpp
new file mode 100644 (file)
index 0000000..d469f90
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2022 EfficiOS Inc.
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#include "lttng-relayd.hpp"
+
+#include <common/compat/poll.hpp>
+#include <common/error.hpp>
+#include <common/fd-tracker/utils.hpp>
+#include <common/readwrite.hpp>
+#include <common/utils.hpp>
+
+/*
+ * Quit pipe for all threads. This permits a single cancellation point
+ * for all threads when receiving an event on the pipe.
+ */
+static int thread_quit_pipe[2] = { -1, -1 };
+
+/*
+ * Write to writable pipe used to notify a thread.
+ */
+static int notify_thread_pipe(int wpipe)
+{
+       const auto ret = lttng_write(wpipe, "!", 1);
+
+       if (ret < 1) {
+               PERROR("Failed to write to thread pipe");
+               return -1;
+       }
+
+       return 0;
+}
+
+/*
+ * Initialize the thread quit pipe.
+ *
+ * Return -1 on error or 0 if all pipes are created.
+ */
+int relayd_init_thread_quit_pipe(void)
+{
+       return fd_tracker_util_pipe_open_cloexec(
+                       the_fd_tracker, "Thread quit pipe", thread_quit_pipe);
+}
+
+/*
+ * Notify the threads to initiate shutdown.
+ *
+ * Return 0 on success or -1 on error.
+ */
+int relayd_notify_thread_quit_pipe(void)
+{
+       return notify_thread_pipe(thread_quit_pipe[1]);
+}
+
+/*
+ * Close the thread quit pipe.
+ */
+void relayd_close_thread_quit_pipe(void)
+{
+       if (thread_quit_pipe[0] != -1) {
+               (void) fd_tracker_util_pipe_close(
+                               the_fd_tracker, thread_quit_pipe);
+       }
+}
+
+/*
+ * Return 1 if 'fd' is the thread quit pipe read fd.
+ */
+bool relayd_is_thread_quit_pipe(const int fd)
+{
+       return (fd == thread_quit_pipe[0]);
+}
+
+/*
+ * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
+ */
+int create_named_thread_poll_set(struct lttng_poll_event *events,
+               int size, const char *name)
+{
+       if (events == NULL || size == 0) {
+               return -1;
+       }
+
+       const auto create_ret = fd_tracker_util_poll_create(the_fd_tracker,
+                       name, events, 1, LTTNG_CLOEXEC);
+       if (create_ret) {
+               PERROR("Failed to create \"%s\" poll file descriptor", name);
+               return -1;
+       }
+
+       /* Add thread quit pipe to monitored events. */
+       const auto poll_add_ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
+       if (poll_add_ret < 0) {
+               return -1;
+       }
+
+       return 0;
+}
index c8fab6164f0c68b05b9545aab7abe25bbb192749..dc8cb2961228e93584d70bde9f67015bb43b8a81 100644 (file)
@@ -355,12 +355,12 @@ void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
  */
 static void *thread_agent_management(void *data)
 {
-       int i, ret, pollfd;
-       uint32_t revents, nb_fd;
+       int i, ret;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttcomm_sock *reg_sock;
        struct thread_notifiers *notifiers = (thread_notifiers *) data;
-       const int quit_pipe_read_fd = lttng_pipe_get_readfd(
+       const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(
                        notifiers->quit_pipe);
 
        DBG("Manage agent application registration.");
@@ -377,7 +377,7 @@ static void *thread_agent_management(void *data)
                goto error_poll_create;
        }
 
-       ret = lttng_poll_add(&events, quit_pipe_read_fd,
+       ret = lttng_poll_add(&events, thread_quit_pipe_fd,
                        LPOLLIN | LPOLLERR);
        if (ret < 0) {
                goto error_tcp_socket;
@@ -439,11 +439,12 @@ restart:
 
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       if (pollfd == quit_pipe_read_fd) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (pollfd == thread_quit_pipe_fd) {
+                               DBG("Activity on thread quit pipe");
                                goto exit;
                        }
 
index 3a83f06b9d4ef862f5d8acf6a7439686bcf1b103..4c4cdf54092a6212c76a8dbbd0a01e656a743b4f 100644 (file)
@@ -2457,9 +2457,9 @@ static void thread_init_cleanup(void *data __attribute__((unused)))
  */
 static void *thread_manage_clients(void *data)
 {
-       int sock = -1, ret, i, pollfd, err = -1;
+       int sock = -1, ret, i, err = -1;
        int sock_error;
-       uint32_t revents, nb_fd;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        const int client_sock = thread_state.client_sock;
        struct lttng_pipe *quit_pipe = (lttng_pipe *) data;
@@ -2551,25 +2551,28 @@ static void *thread_manage_clients(void *data)
                nb_fd = ret;
 
                for (i = 0; i < nb_fd; i++) {
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       /* Fetch once the poll data. */
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        health_code_update();
 
+                       /* Activity on thread quit pipe, exiting. */
                        if (pollfd == thread_quit_pipe_fd) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
+                       }
+
+                       /* Event on the registration socket */
+                       if (revents & LPOLLIN) {
+                               continue;
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("Client socket poll error");
+                               goto error;
                        } else {
-                               /* Event on the registration socket */
-                               if (revents & LPOLLIN) {
-                                       continue;
-                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       ERR("Client socket poll error");
-                                       goto error;
-                               } else {
-                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                                       goto error;
-                               }
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
 
index 0735907a26ad3ae67d9fc26d6f543e8f7f0c98fa..1ba991447bd5223e17b61cdfe3767692ef9810d5 100644 (file)
@@ -54,14 +54,14 @@ static void cleanup_health_management_thread(void *data)
 static void *thread_manage_health(void *data)
 {
        const bool is_root = (getuid() == 0);
-       int sock = -1, new_sock = -1, ret, i, pollfd, err = -1;
-       uint32_t revents, nb_fd;
+       int sock = -1, new_sock = -1, ret, i, err = -1;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        struct health_comm_msg msg;
        struct health_comm_reply reply;
        /* Thread-specific quit pipe. */
        struct thread_notifiers *notifiers = (thread_notifiers *) data;
-       const int quit_pipe_read_fd = lttng_pipe_get_readfd(
+       const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(
                        notifiers->quit_pipe);
 
        DBG("[thread] Manage health check started");
@@ -70,7 +70,7 @@ static void *thread_manage_health(void *data)
 
        /*
         * Created with a size of two for:
-        *   - client socket
+        *   - health client socket
         *   - thread quit pipe
         */
        ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
@@ -122,12 +122,12 @@ static void *thread_manage_health(void *data)
                goto error;
        }
 
-       ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
+       ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLERR);
        if (ret < 0) {
                goto error;
        }
 
-       /* Add the application registration socket */
+       /* Add the health client socket. */
        ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLPRI);
        if (ret < 0) {
                goto error;
@@ -154,25 +154,26 @@ restart:
 
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
-
-                       /* Event on the registration socket */
-                       if (pollfd == sock) {
-                               if (revents & LPOLLIN) {
-                                       continue;
-                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       ERR("Health socket poll error");
-                                       goto error;
-                               } else {
-                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                                       goto error;
-                               }
-                       } else {
-                               /* Event on the thread's quit pipe. */
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                       /* Activity on thread quit pipe, exiting. */
+                       if (pollfd == thread_quit_pipe_fd) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
+
+                       /* Event on the health client socket. */
+                       if (revents & LPOLLIN) {
+                               continue;
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("Health socket poll error");
+                               goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
+                       }
                }
 
                new_sock = lttcomm_accept_unix_sock(sock);
index f8297a6df82820a975bf514702d692de7057ba21..7698ea442feff54e2bc8902334f70f93d8270b98 100644 (file)
@@ -43,12 +43,12 @@ static void cleanup_application_management_thread(void *data)
  */
 static void *thread_application_management(void *data)
 {
-       int i, ret, pollfd, err = -1;
+       int i, ret, err = -1;
        ssize_t size_ret;
-       uint32_t revents, nb_fd;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        struct thread_notifiers *notifiers = (thread_notifiers *) data;
-       const int quit_pipe_read_fd = lttng_pipe_get_readfd(
+       const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(
                        notifiers->quit_pipe);
 
        DBG("[thread] Manage application started");
@@ -75,7 +75,7 @@ static void *thread_application_management(void *data)
                goto error;
        }
 
-       ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
+       ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLERR);
        if (ret < 0) {
                goto error;
        }
@@ -110,15 +110,19 @@ static void *thread_application_management(void *data)
 
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        health_code_update();
 
-                       if (pollfd == quit_pipe_read_fd) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (pollfd == thread_quit_pipe_fd) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
-                       } else if (pollfd == notifiers->apps_cmd_pipe_read_fd) {
+                       }
+
+                       if (pollfd == notifiers->apps_cmd_pipe_read_fd) {
                                /* Inspect the apps cmd pipe */
                                if (revents & LPOLLIN) {
                                        int sock;
index a3bb816f9459457ab0a1a74379bab796283a228b..d9d1e670ec87e2da1f356c4dc2930db6005a1c83 100644 (file)
@@ -55,13 +55,13 @@ static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
  */
 static void *thread_consumer_management(void *data)
 {
-       int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
-       uint32_t revents, nb_fd;
+       int sock = -1, i, ret, err = -1, should_quit = 0;
+       uint32_t nb_fd;
        enum lttcomm_return_code code;
        struct lttng_poll_event events;
        struct thread_notifiers *notifiers = (thread_notifiers *) data;
        struct consumer_data *consumer_data = notifiers->consumer_data;
-       const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
+       const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
        struct consumer_socket *cmd_socket_wrapper = NULL;
 
        DBG("[thread] Manage consumer started");
@@ -83,7 +83,7 @@ static void *thread_consumer_management(void *data)
                goto error_poll;
        }
 
-       ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
+       ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLERR);
        if (ret < 0) {
                mark_thread_intialization_as_failed(notifiers);
                goto error;
@@ -121,13 +121,14 @@ static void *thread_consumer_management(void *data)
 
        for (i = 0; i < nb_fd; i++) {
                /* Fetch once the poll data */
-               revents = LTTNG_POLL_GETEV(&events, i);
-               pollfd = LTTNG_POLL_GETFD(&events, i);
+               const auto revents = LTTNG_POLL_GETEV(&events, i);
+               const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
                health_code_update();
 
-               /* Thread quit pipe has been closed. Killing thread. */
-               if (pollfd == quit_pipe_read_fd) {
+               /* Activity on thread quit pipe, exiting. */
+               if (pollfd == thread_quit_pipe_fd) {
+                       DBG("Activity on thread quit pipe");
                        err = 0;
                        mark_thread_intialization_as_failed(notifiers);
                        goto exit;
@@ -290,8 +291,8 @@ static void *thread_consumer_management(void *data)
 
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        health_code_update();
 
@@ -300,7 +301,7 @@ static void *thread_consumer_management(void *data)
                         * but continue the current loop to handle potential data from
                         * consumer.
                         */
-                       if (pollfd == quit_pipe_read_fd) {
+                       if (pollfd == thread_quit_pipe_fd) {
                                should_quit = 1;
                        } else if (pollfd == sock) {
                                /* Event on the consumerd socket */
index 8d2795466bb32400beda37fd2757aa1b94cb5441..0920852501d486e2d1a6bbdf7ba07189a54a6423 100644 (file)
@@ -165,12 +165,12 @@ error:
  */
 static void *thread_kernel_management(void *data)
 {
-       int ret, i, pollfd, update_poll_flag = 1, err = -1;
-       uint32_t revents, nb_fd;
+       int ret, i, update_poll_flag = 1, err = -1;
+       uint32_t nb_fd;
        char tmp;
        struct lttng_poll_event events;
        struct thread_notifiers *notifiers = (thread_notifiers *) data;
-       const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
+       const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
 
        DBG("[thread] Thread manage kernel started");
 
@@ -212,7 +212,7 @@ static void *thread_kernel_management(void *data)
                        }
 
                        ret = lttng_poll_add(&events,
-                                       quit_pipe_read_fd,
+                                       thread_quit_pipe_fd,
                                        LPOLLIN);
                        if (ret < 0) {
                                goto error;
@@ -254,12 +254,14 @@ static void *thread_kernel_management(void *data)
 
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        health_code_update();
 
-                       if (pollfd == quit_pipe_read_fd) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (pollfd == thread_quit_pipe_fd) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
                        }
index 1fff92d071ba01f1f70aac518493194c791485e0..1b83491d14b920686898a77a0f249bda42223520 100644 (file)
@@ -30,12 +30,12 @@ struct thread_notifiers {
  */
 static void *thread_application_notification(void *data)
 {
-       int i, ret, pollfd, err = -1;
+       int i, ret, err = -1;
        ssize_t size_ret;
-       uint32_t revents, nb_fd;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        struct thread_notifiers *notifiers = (thread_notifiers *) data;
-       const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
+       const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
 
        DBG("[ust-thread] Manage application notify command");
 
@@ -63,7 +63,7 @@ static void *thread_application_notification(void *data)
                goto error;
        }
 
-       ret = lttng_poll_add(&events, quit_pipe_read_fd,
+       ret = lttng_poll_add(&events, thread_quit_pipe_fd,
                        LPOLLIN | LPOLLERR);
        if (ret < 0) {
                goto error;
@@ -97,14 +97,17 @@ restart:
                        health_code_update();
 
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       if (pollfd == quit_pipe_read_fd) {
+                       /* Activity on thread quit pipe, exiting. */
+                       if (pollfd == thread_quit_pipe_fd) {
+                               DBG("Activity on thread quit pipe");
                                err = 0;
                                goto exit;
-                       } else if (pollfd == notifiers->apps_cmd_notify_pipe_read_fd) {
+                       }
+
+                       if (pollfd == notifiers->apps_cmd_notify_pipe_read_fd) {
                                /* Inspect the apps cmd pipe */
                                int sock;
 
index 34efeac40f57729fd0811806e56cb7665847fefc..4478221ff530092307ac563fcda14157fe6818c5 100644 (file)
@@ -153,8 +153,8 @@ static void thread_init_cleanup(void *data)
  */
 static void *thread_application_registration(void *data)
 {
-       int sock = -1, i, ret, pollfd, err = -1;
-       uint32_t revents, nb_fd;
+       int sock = -1, i, ret, err = -1;
+       uint32_t nb_fd;
        struct lttng_poll_event events;
        /*
         * Gets allocated in this thread, enqueued to a global queue, dequeued
@@ -164,7 +164,7 @@ static void *thread_application_registration(void *data)
        const bool is_root = (getuid() == 0);
        struct thread_state *thread_state = (struct thread_state *) data;
        const int application_socket = thread_state->application_socket;
-       const int quit_pipe_read_fd = lttng_pipe_get_readfd(
+       const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(
                        thread_state->quit_pipe);
 
        DBG("[thread] Manage application registration started");
@@ -193,7 +193,7 @@ static void *thread_application_registration(void *data)
        }
 
        /* Add the application registration socket */
-       ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLRDHUP);
+       ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLRDHUP);
        if (ret < 0) {
                goto error_poll_add;
        }
@@ -229,116 +229,116 @@ static void *thread_application_registration(void *data)
                        health_code_update();
 
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&events, i);
+                       const auto pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Thread quit pipe has been closed. Killing thread. */
-                       if (pollfd == quit_pipe_read_fd) {
+                       /* Activity on thread quit pipe, closing. */
+                       if (pollfd == thread_quit_pipe_fd) {
                                err = 0;
                                goto exit;
-                       } else {
-                               /* Event on the registration socket */
-                               if (revents & LPOLLIN) {
-                                       sock = lttcomm_accept_unix_sock(application_socket);
-                                       if (sock < 0) {
-                                               goto error;
-                                       }
+                       }
 
-                                       /*
-                                        * Set socket timeout for both receiving and ending.
-                                        * app_socket_timeout is in seconds, whereas
-                                        * lttcomm_setsockopt_rcv_timeout and
-                                        * lttcomm_setsockopt_snd_timeout expect msec as
-                                        * parameter.
-                                        */
-                                       if (the_config.app_socket_timeout >= 0) {
-                                               (void) lttcomm_setsockopt_rcv_timeout(sock,
-                                                               the_config.app_socket_timeout * 1000);
-                                               (void) lttcomm_setsockopt_snd_timeout(sock,
-                                                               the_config.app_socket_timeout * 1000);
-                                       }
+                       /* Event on the registration socket. */
+                       if (revents & LPOLLIN) {
+                               sock = lttcomm_accept_unix_sock(application_socket);
+                               if (sock < 0) {
+                                       goto error;
+                               }
 
-                                       /*
-                                        * Set the CLOEXEC flag. Return code is useless because
-                                        * either way, the show must go on.
-                                        */
-                                       (void) utils_set_fd_cloexec(sock);
-
-                                       /* Create UST registration command for enqueuing */
-                                       ust_cmd = zmalloc<ust_command>();
-                                       if (ust_cmd == NULL) {
-                                               PERROR("ust command zmalloc");
-                                               ret = close(sock);
-                                               if (ret) {
-                                                       PERROR("close");
-                                               }
-                                               sock = -1;
-                                               goto error;
-                                       }
+                               /*
+                                * Set socket timeout for both receiving and ending.
+                                * app_socket_timeout is in seconds, whereas
+                                * lttcomm_setsockopt_rcv_timeout and
+                                * lttcomm_setsockopt_snd_timeout expect msec as
+                                * parameter.
+                                */
+                               if (the_config.app_socket_timeout >= 0) {
+                                       (void) lttcomm_setsockopt_rcv_timeout(sock,
+                                                       the_config.app_socket_timeout * 1000);
+                                       (void) lttcomm_setsockopt_snd_timeout(sock,
+                                                       the_config.app_socket_timeout * 1000);
+                               }
 
-                                       /*
-                                        * Using message-based transmissions to ensure we don't
-                                        * have to deal with partially received messages.
-                                        */
-                                       ret = lttng_fd_get(LTTNG_FD_APPS, 1);
-                                       if (ret < 0) {
-                                               ERR("Exhausted file descriptors allowed for applications.");
-                                               free(ust_cmd);
-                                               ret = close(sock);
-                                               if (ret) {
-                                                       PERROR("close");
-                                               }
-                                               sock = -1;
-                                               continue;
+                               /*
+                                * Set the CLOEXEC flag. Return code is useless because
+                                * either way, the show must go on.
+                                */
+                               (void) utils_set_fd_cloexec(sock);
+
+                               /* Create UST registration command for enqueuing */
+                               ust_cmd = zmalloc<ust_command>();
+                               if (ust_cmd == NULL) {
+                                       PERROR("ust command zmalloc");
+                                       ret = close(sock);
+                                       if (ret) {
+                                               PERROR("close");
                                        }
+                                       sock = -1;
+                                       goto error;
+                               }
 
-                                       health_code_update();
-                                       ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg);
-                                       if (ret < 0) {
-                                               free(ust_cmd);
-                                               /* Close socket of the application. */
-                                               ret = close(sock);
-                                               if (ret) {
-                                                       PERROR("close");
-                                               }
-                                               lttng_fd_put(LTTNG_FD_APPS, 1);
-                                               sock = -1;
-                                               continue;
+                               /*
+                                * Using message-based transmissions to ensure we don't
+                                * have to deal with partially received messages.
+                                */
+                               ret = lttng_fd_get(LTTNG_FD_APPS, 1);
+                               if (ret < 0) {
+                                       ERR("Exhausted file descriptors allowed for applications.");
+                                       free(ust_cmd);
+                                       ret = close(sock);
+                                       if (ret) {
+                                               PERROR("close");
                                        }
-                                       health_code_update();
-
-                                       ust_cmd->sock = sock;
                                        sock = -1;
+                                       continue;
+                               }
 
-                                       DBG("UST registration received with pid:%d ppid:%d uid:%d"
-                                                       " gid:%d sock:%d name:%s (version %d.%d)",
-                                                       ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
-                                                       ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
-                                                       ust_cmd->sock, ust_cmd->reg_msg.name,
-                                                       ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
-
-                                       /*
-                                        * Lock free enqueue the registration request. The red pill
-                                        * has been taken! This apps will be part of the *system*.
-                                        */
-                                       cds_wfcq_head_ptr_t head;
-                                       head.h = &thread_state->ust_cmd_queue->head;
-                                       cds_wfcq_enqueue(head,
-                                                       &thread_state->ust_cmd_queue->tail,
-                                                       &ust_cmd->node);
-
-                                       /*
-                                        * Wake the registration queue futex. Implicit memory
-                                        * barrier with the exchange in cds_wfcq_enqueue.
-                                        */
-                                       futex_nto1_wake(&thread_state->ust_cmd_queue->futex);
-                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       ERR("Register apps socket poll error");
-                                       goto error;
-                               } else {
-                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
-                                       goto error;
+                               health_code_update();
+                               ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg);
+                               if (ret < 0) {
+                                       free(ust_cmd);
+                                       /* Close socket of the application. */
+                                       ret = close(sock);
+                                       if (ret) {
+                                               PERROR("close");
+                                       }
+                                       lttng_fd_put(LTTNG_FD_APPS, 1);
+                                       sock = -1;
+                                       continue;
                                }
+                               health_code_update();
+
+                               ust_cmd->sock = sock;
+                               sock = -1;
+
+                               DBG("UST registration received with pid:%d ppid:%d uid:%d"
+                                               " gid:%d sock:%d name:%s (version %d.%d)",
+                                               ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
+                                               ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
+                                               ust_cmd->sock, ust_cmd->reg_msg.name,
+                                               ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
+
+                               /*
+                                * Lock free enqueue the registration request. The red pill
+                                * has been taken! This apps will be part of the *system*.
+                                */
+                               cds_wfcq_head_ptr_t head;
+                               head.h = &thread_state->ust_cmd_queue->head;
+                               cds_wfcq_enqueue(head,
+                                               &thread_state->ust_cmd_queue->tail,
+                                               &ust_cmd->node);
+
+                               /*
+                                * Wake the registration queue futex. Implicit memory
+                                * barrier with the exchange in cds_wfcq_enqueue.
+                                */
+                               futex_nto1_wake(&thread_state->ust_cmd_queue->futex);
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("Register apps socket poll error");
+                               goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
index f3446bc26c5b660a18e2611617f6c2fb4e7ce17b..8fec340403c4994fcee00a0678dc071906c66554 100644 (file)
@@ -30,7 +30,8 @@ int notify_thread_pipe(int wpipe)
 
        ret = lttng_write(wpipe, "!", 1);
        if (ret < 1) {
-               PERROR("write poll pipe");
+               ret = -1;
+               PERROR("Failed to write to thread pipe");
        }
 
        return (int) ret;
This page took 0.044418 seconds and 4 git commands to generate.