X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.cpp;h=528d1451cf769feea1a694550b5220d20f7d852d;hb=8a00688e1d58cc5a2e77eba206ff23bd6105130c;hp=41c5e1d4419d740b452528410b2345156717cfbe;hpb=0114db0ec2407029052eb61a0189c9b1cd64d520;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index 41c5e1d44..528d1451c 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -126,12 +126,6 @@ static int tracing_group_name_override; const char * const config_section_name = "relayd"; -/* - * Quit pipe for all threads. This permits a single cancellation point - * for all threads when receiving an event on the pipe. - */ -int thread_quit_pipe[2] = { -1, -1 }; - /* * This pipe is used to inform the worker thread that a command is queued and * ready to be processed. @@ -722,10 +716,7 @@ static void relayd_cleanup(void) (void) fd_tracker_util_pipe_close( the_fd_tracker, health_quit_pipe); } - if (thread_quit_pipe[0] != -1) { - (void) fd_tracker_util_pipe_close( - the_fd_tracker, thread_quit_pipe); - } + relayd_close_thread_quit_pipe(); if (sessiond_trace_chunk_registry) { sessiond_trace_chunk_registry_destroy( sessiond_trace_chunk_registry); @@ -748,23 +739,6 @@ static void relayd_cleanup(void) } } -/* - * Write to writable pipe used to notify a thread. - */ -static int notify_thread_pipe(int wpipe) -{ - ssize_t ret; - - ret = lttng_write(wpipe, "!", 1); - if (ret < 1) { - PERROR("write poll pipe"); - goto end; - } - ret = 0; -end: - return ret; -} - static int notify_health_quit_pipe(int *pipe) { ssize_t ret; @@ -788,7 +762,7 @@ int lttng_relay_stop_threads(void) /* Stopping all threads */ DBG("Terminating all threads"); - if (notify_thread_pipe(thread_quit_pipe[1])) { + if (relayd_notify_thread_quit_pipe()) { ERR("write error on thread quit pipe"); retval = -1; } @@ -892,17 +866,6 @@ void lttng_relay_notify_ready(void) } } -/* - * Init thread quit pipe. - * - * Return -1 on error or 0 if all pipes are created. - */ -static int init_thread_quit_pipe(void) -{ - return fd_tracker_util_pipe_open_cloexec( - the_fd_tracker, "Quit pipe", thread_quit_pipe); -} - /* * Init health quit pipe. * @@ -914,52 +877,6 @@ static int init_health_quit_pipe(void) "Health quit pipe", health_quit_pipe); } -/* - * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. - */ -static int create_named_thread_poll_set(struct lttng_poll_event *events, - int size, const char *name) -{ - int ret; - - if (events == NULL || size == 0) { - ret = -1; - goto error; - } - - ret = fd_tracker_util_poll_create(the_fd_tracker, - name, events, 1, LTTNG_CLOEXEC); - if (ret) { - PERROR("Failed to create \"%s\" poll file descriptor", name); - goto error; - } - - /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); - if (ret < 0) { - goto error; - } - - return 0; - -error: - return ret; -} - -/* - * Check if the thread quit pipe was triggered. - * - * Return 1 if it was triggered else 0; - */ -static int check_thread_quit_pipe(int fd, uint32_t events) -{ - if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { - return 1; - } - - return 0; -} - static int create_sock(void *data, int *out_fd) { int ret; @@ -1089,8 +1006,8 @@ end: */ static void *relay_thread_listener(void *data __attribute__((unused))) { - int i, ret, pollfd, err = -1; - uint32_t revents, nb_fd; + int i, ret, err = -1; + uint32_t nb_fd; struct lttng_poll_event events; struct lttcomm_sock *control_sock, *data_sock; @@ -1161,15 +1078,15 @@ restart: DBG("Relay new connection received"); for (i = 0; i < nb_fd; i++) { - health_code_update(); - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + health_code_update(); + + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -2539,12 +2456,14 @@ static ssize_t relay_unpack_rotate_streams_header( * We start by "unpacking" `stream_count` to figure out the padding length * emited by our peer. */ - memcpy(&rotate_streams.stream_count, payload->data, - sizeof(rotate_streams.stream_count)); - rotate_streams = (typeof(rotate_streams)) { - .stream_count = be32toh(rotate_streams.stream_count), - .new_chunk_id = LTTNG_OPTIONAL_INIT_UNSET, - }; + { + decltype(rotate_streams.stream_count) stream_count; + + memcpy(&stream_count, payload->data, sizeof(stream_count)); + rotate_streams.stream_count = be32toh(stream_count); + } + + rotate_streams.new_chunk_id = LTTNG_OPTIONAL_INIT_UNSET; /* * Payload size expected given the possible padding lengths in @@ -2574,13 +2493,11 @@ static ssize_t relay_unpack_rotate_streams_header( memcpy(&packed_rotate_streams, payload->data, header_len); /* Unpack the packed structure to the natively-packed version. */ - *_rotate_streams = (typeof(*_rotate_streams)) { - .stream_count = be32toh(packed_rotate_streams.stream_count), - .new_chunk_id = (typeof(_rotate_streams->new_chunk_id)) { - .is_set = !!packed_rotate_streams.new_chunk_id.is_set, - .value = be64toh(packed_rotate_streams.new_chunk_id.value), - } + _rotate_streams->new_chunk_id = (typeof(_rotate_streams->new_chunk_id)){ + .is_set = !!packed_rotate_streams.new_chunk_id.is_set, + .value = be64toh(packed_rotate_streams.new_chunk_id.value), }; + _rotate_streams->stream_count = be32toh(packed_rotate_streams.stream_count); } else if (payload->size == expected_payload_size_3_bytes_padding) { struct lttcomm_relayd_rotate_streams_3_bytes_padding padded_rotate_streams; @@ -2590,13 +2507,11 @@ static ssize_t relay_unpack_rotate_streams_header( memcpy(&padded_rotate_streams, payload->data, header_len); /* Unpack the 3-byte padded structure to the natively-packed version. */ - *_rotate_streams = (typeof(*_rotate_streams)) { - .stream_count = be32toh(padded_rotate_streams.stream_count), - .new_chunk_id = (typeof(_rotate_streams->new_chunk_id)) { - .is_set = !!padded_rotate_streams.new_chunk_id.is_set, - .value = be64toh(padded_rotate_streams.new_chunk_id.value), - } + _rotate_streams->new_chunk_id = (typeof(_rotate_streams->new_chunk_id)){ + .is_set = !!padded_rotate_streams.new_chunk_id.is_set, + .value = be64toh(padded_rotate_streams.new_chunk_id.value), }; + _rotate_streams->stream_count = be32toh(padded_rotate_streams.stream_count); } else if (payload->size == expected_payload_size_7_bytes_padding) { struct lttcomm_relayd_rotate_streams_7_bytes_padding padded_rotate_streams; @@ -2606,13 +2521,11 @@ static ssize_t relay_unpack_rotate_streams_header( memcpy(&padded_rotate_streams, payload->data, header_len); /* Unpack the 7-byte padded structure to the natively-packed version. */ - *_rotate_streams = (typeof(*_rotate_streams)) { - .stream_count = be32toh(padded_rotate_streams.stream_count), - .new_chunk_id = (typeof(_rotate_streams->new_chunk_id)) { - .is_set = !!padded_rotate_streams.new_chunk_id.is_set, - .value = be64toh(padded_rotate_streams.new_chunk_id.value), - } + _rotate_streams->new_chunk_id = (typeof(_rotate_streams->new_chunk_id)){ + .is_set = !!padded_rotate_streams.new_chunk_id.is_set, + .value = be64toh(padded_rotate_streams.new_chunk_id.value), }; + _rotate_streams->stream_count = be32toh(padded_rotate_streams.stream_count); header_len = sizeof(padded_rotate_streams); } else { @@ -3982,14 +3895,14 @@ restart: */ for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -4395,7 +4308,7 @@ int main(int argc, char **argv) } /* Create thread quit pipe */ - if (init_thread_quit_pipe()) { + if (relayd_init_thread_quit_pipe()) { retval = -1; goto exit_options; }