X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.cpp;h=528d1451cf769feea1a694550b5220d20f7d852d;hb=8a00688e1d58cc5a2e77eba206ff23bd6105130c;hp=99f7920103df184faf75075d7db9ce47979f6422;hpb=f46376a14da2eb796690cb4e718e8b213839d6ea;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index 99f792010..528d1451c 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -36,45 +36,45 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "backward-compatibility-group-by.h" -#include "cmd.h" -#include "connection.h" -#include "ctf-trace.h" -#include "health-relayd.h" -#include "index.h" -#include "live.h" -#include "lttng-relayd.h" -#include "session.h" -#include "sessiond-trace-chunks.h" -#include "stream.h" -#include "tcp_keep_alive.h" -#include "testpoint.h" -#include "tracefile-array.h" -#include "utils.h" -#include "version.h" -#include "viewer-stream.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "backward-compatibility-group-by.hpp" +#include "cmd.hpp" +#include "connection.hpp" +#include "ctf-trace.hpp" +#include "health-relayd.hpp" +#include "index.hpp" +#include "live.hpp" +#include "lttng-relayd.hpp" +#include "session.hpp" +#include "sessiond-trace-chunks.hpp" +#include "stream.hpp" +#include "tcp_keep_alive.hpp" +#include "testpoint.hpp" +#include "tracefile-array.hpp" +#include "utils.hpp" +#include "version.hpp" +#include "viewer-stream.hpp" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -126,12 +126,6 @@ static int tracing_group_name_override; const char * const config_section_name = "relayd"; -/* - * Quit pipe for all threads. This permits a single cancellation point - * for all threads when receiving an event on the pipe. - */ -int thread_quit_pipe[2] = { -1, -1 }; - /* * This pipe is used to inform the worker thread that a command is queued and * ready to be processed. @@ -722,10 +716,7 @@ static void relayd_cleanup(void) (void) fd_tracker_util_pipe_close( the_fd_tracker, health_quit_pipe); } - if (thread_quit_pipe[0] != -1) { - (void) fd_tracker_util_pipe_close( - the_fd_tracker, thread_quit_pipe); - } + relayd_close_thread_quit_pipe(); if (sessiond_trace_chunk_registry) { sessiond_trace_chunk_registry_destroy( sessiond_trace_chunk_registry); @@ -748,23 +739,6 @@ static void relayd_cleanup(void) } } -/* - * Write to writable pipe used to notify a thread. - */ -static int notify_thread_pipe(int wpipe) -{ - ssize_t ret; - - ret = lttng_write(wpipe, "!", 1); - if (ret < 1) { - PERROR("write poll pipe"); - goto end; - } - ret = 0; -end: - return ret; -} - static int notify_health_quit_pipe(int *pipe) { ssize_t ret; @@ -788,7 +762,7 @@ int lttng_relay_stop_threads(void) /* Stopping all threads */ DBG("Terminating all threads"); - if (notify_thread_pipe(thread_quit_pipe[1])) { + if (relayd_notify_thread_quit_pipe()) { ERR("write error on thread quit pipe"); retval = -1; } @@ -892,17 +866,6 @@ void lttng_relay_notify_ready(void) } } -/* - * Init thread quit pipe. - * - * Return -1 on error or 0 if all pipes are created. - */ -static int init_thread_quit_pipe(void) -{ - return fd_tracker_util_pipe_open_cloexec( - the_fd_tracker, "Quit pipe", thread_quit_pipe); -} - /* * Init health quit pipe. * @@ -914,52 +877,6 @@ static int init_health_quit_pipe(void) "Health quit pipe", health_quit_pipe); } -/* - * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. - */ -static int create_named_thread_poll_set(struct lttng_poll_event *events, - int size, const char *name) -{ - int ret; - - if (events == NULL || size == 0) { - ret = -1; - goto error; - } - - ret = fd_tracker_util_poll_create(the_fd_tracker, - name, events, 1, LTTNG_CLOEXEC); - if (ret) { - PERROR("Failed to create \"%s\" poll file descriptor", name); - goto error; - } - - /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); - if (ret < 0) { - goto error; - } - - return 0; - -error: - return ret; -} - -/* - * Check if the thread quit pipe was triggered. - * - * Return 1 if it was triggered else 0; - */ -static int check_thread_quit_pipe(int fd, uint32_t events) -{ - if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { - return 1; - } - - return 0; -} - static int create_sock(void *data, int *out_fd) { int ret; @@ -1089,8 +1006,8 @@ end: */ static void *relay_thread_listener(void *data __attribute__((unused))) { - int i, ret, pollfd, err = -1; - uint32_t revents, nb_fd; + int i, ret, err = -1; + uint32_t nb_fd; struct lttng_poll_event events; struct lttcomm_sock *control_sock, *data_sock; @@ -1161,15 +1078,15 @@ restart: DBG("Relay new connection received"); for (i = 0; i < nb_fd; i++) { - health_code_update(); - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + health_code_update(); + + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -1329,7 +1246,7 @@ static void *relay_thread_dispatcher(void *data __attribute__((unused))) /* Continue thread execution */ break; } - new_conn = caa_container_of(node, struct relay_connection, qnode); + new_conn = lttng::utils::container_of(node, &relay_connection::qnode); DBG("Dispatching request waiting on sock %d", new_conn->sock->fd); @@ -2539,11 +2456,14 @@ static ssize_t relay_unpack_rotate_streams_header( * We start by "unpacking" `stream_count` to figure out the padding length * emited by our peer. */ - memcpy(&rotate_streams.stream_count, payload->data, - sizeof(rotate_streams.stream_count)); - rotate_streams = (typeof(rotate_streams)) { - .stream_count = be32toh(rotate_streams.stream_count), - }; + { + decltype(rotate_streams.stream_count) stream_count; + + memcpy(&stream_count, payload->data, sizeof(stream_count)); + rotate_streams.stream_count = be32toh(stream_count); + } + + rotate_streams.new_chunk_id = LTTNG_OPTIONAL_INIT_UNSET; /* * Payload size expected given the possible padding lengths in @@ -2573,13 +2493,11 @@ static ssize_t relay_unpack_rotate_streams_header( memcpy(&packed_rotate_streams, payload->data, header_len); /* Unpack the packed structure to the natively-packed version. */ - *_rotate_streams = (typeof(*_rotate_streams)) { - .stream_count = be32toh(packed_rotate_streams.stream_count), - .new_chunk_id = (typeof(_rotate_streams->new_chunk_id)) { - .is_set = !!packed_rotate_streams.new_chunk_id.is_set, - .value = be64toh(packed_rotate_streams.new_chunk_id.value), - } + _rotate_streams->new_chunk_id = (typeof(_rotate_streams->new_chunk_id)){ + .is_set = !!packed_rotate_streams.new_chunk_id.is_set, + .value = be64toh(packed_rotate_streams.new_chunk_id.value), }; + _rotate_streams->stream_count = be32toh(packed_rotate_streams.stream_count); } else if (payload->size == expected_payload_size_3_bytes_padding) { struct lttcomm_relayd_rotate_streams_3_bytes_padding padded_rotate_streams; @@ -2589,13 +2507,11 @@ static ssize_t relay_unpack_rotate_streams_header( memcpy(&padded_rotate_streams, payload->data, header_len); /* Unpack the 3-byte padded structure to the natively-packed version. */ - *_rotate_streams = (typeof(*_rotate_streams)) { - .stream_count = be32toh(padded_rotate_streams.stream_count), - .new_chunk_id = (typeof(_rotate_streams->new_chunk_id)) { - .is_set = !!padded_rotate_streams.new_chunk_id.is_set, - .value = be64toh(padded_rotate_streams.new_chunk_id.value), - } + _rotate_streams->new_chunk_id = (typeof(_rotate_streams->new_chunk_id)){ + .is_set = !!padded_rotate_streams.new_chunk_id.is_set, + .value = be64toh(padded_rotate_streams.new_chunk_id.value), }; + _rotate_streams->stream_count = be32toh(padded_rotate_streams.stream_count); } else if (payload->size == expected_payload_size_7_bytes_padding) { struct lttcomm_relayd_rotate_streams_7_bytes_padding padded_rotate_streams; @@ -2605,13 +2521,11 @@ static ssize_t relay_unpack_rotate_streams_header( memcpy(&padded_rotate_streams, payload->data, header_len); /* Unpack the 7-byte padded structure to the natively-packed version. */ - *_rotate_streams = (typeof(*_rotate_streams)) { - .stream_count = be32toh(padded_rotate_streams.stream_count), - .new_chunk_id = (typeof(_rotate_streams->new_chunk_id)) { - .is_set = !!padded_rotate_streams.new_chunk_id.is_set, - .value = be64toh(padded_rotate_streams.new_chunk_id.value), - } + _rotate_streams->new_chunk_id = (typeof(_rotate_streams->new_chunk_id)){ + .is_set = !!padded_rotate_streams.new_chunk_id.is_set, + .value = be64toh(padded_rotate_streams.new_chunk_id.value), }; + _rotate_streams->stream_count = be32toh(padded_rotate_streams.stream_count); header_len = sizeof(padded_rotate_streams); } else { @@ -3981,14 +3895,14 @@ restart: */ for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -4394,7 +4308,7 @@ int main(int argc, char **argv) } /* Create thread quit pipe */ - if (init_thread_quit_pipe()) { + if (relayd_init_thread_quit_pipe()) { retval = -1; goto exit_options; }