X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.cpp;h=528d1451cf769feea1a694550b5220d20f7d852d;hb=8a00688e1d58cc5a2e77eba206ff23bd6105130c;hp=1da83086e82b4d7107fc4d36e5c97d1cf173b388;hpb=ff2aa8f08b117f2a309ed49cf68b135f930093f8;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index 1da83086e..528d1451c 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -36,44 +36,45 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "backward-compatibility-group-by.h" -#include "cmd.h" -#include "connection.h" -#include "ctf-trace.h" -#include "health-relayd.h" -#include "index.h" -#include "live.h" -#include "lttng-relayd.h" -#include "session.h" -#include "sessiond-trace-chunks.h" -#include "stream.h" -#include "tcp_keep_alive.h" -#include "testpoint.h" -#include "tracefile-array.h" -#include "utils.h" -#include "version.h" -#include "viewer-stream.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "backward-compatibility-group-by.hpp" +#include "cmd.hpp" +#include "connection.hpp" +#include "ctf-trace.hpp" +#include "health-relayd.hpp" +#include "index.hpp" +#include "live.hpp" +#include "lttng-relayd.hpp" +#include "session.hpp" +#include "sessiond-trace-chunks.hpp" +#include "stream.hpp" +#include "tcp_keep_alive.hpp" +#include "testpoint.hpp" +#include "tracefile-array.hpp" +#include "utils.hpp" +#include "version.hpp" +#include "viewer-stream.hpp" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -96,6 +97,11 @@ char *opt_output_path, *opt_working_directory; static int opt_daemon, opt_background, opt_print_version, opt_allow_clear = 1; enum relay_group_output_by opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_UNKNOWN; +/* Argument variables */ +int lttng_opt_quiet; /* not static in error.h */ +int lttng_opt_verbose; /* not static in error.h */ +int lttng_opt_mi; /* not static in error.h */ + /* * We need to wait for listener and live listener threads, as well as * health check thread, before being ready to signal readiness. @@ -120,12 +126,6 @@ static int tracing_group_name_override; const char * const config_section_name = "relayd"; -/* - * Quit pipe for all threads. This permits a single cancellation point - * for all threads when receiving an event on the pipe. - */ -int thread_quit_pipe[2] = { -1, -1 }; - /* * This pipe is used to inform the worker thread that a command is queued and * ready to be processed. @@ -399,7 +399,8 @@ end: * See config_entry_handler_cb comment in common/config/session-config.h for the * return value conventions. */ -static int config_entry_handler(const struct config_entry *entry, void *unused) +static int config_entry_handler(const struct config_entry *entry, + void *unused __attribute__((unused))) { int ret = 0, i; @@ -670,7 +671,8 @@ static void print_global_objects(void) print_sessions(); } -static int noop_close(void *data, int *fds) +static int noop_close(void *data __attribute__((unused)), + int *fds __attribute__((unused))) { return 0; } @@ -714,10 +716,7 @@ static void relayd_cleanup(void) (void) fd_tracker_util_pipe_close( the_fd_tracker, health_quit_pipe); } - if (thread_quit_pipe[0] != -1) { - (void) fd_tracker_util_pipe_close( - the_fd_tracker, thread_quit_pipe); - } + relayd_close_thread_quit_pipe(); if (sessiond_trace_chunk_registry) { sessiond_trace_chunk_registry_destroy( sessiond_trace_chunk_registry); @@ -740,23 +739,6 @@ static void relayd_cleanup(void) } } -/* - * Write to writable pipe used to notify a thread. - */ -static int notify_thread_pipe(int wpipe) -{ - ssize_t ret; - - ret = lttng_write(wpipe, "!", 1); - if (ret < 1) { - PERROR("write poll pipe"); - goto end; - } - ret = 0; -end: - return ret; -} - static int notify_health_quit_pipe(int *pipe) { ssize_t ret; @@ -780,7 +762,7 @@ int lttng_relay_stop_threads(void) /* Stopping all threads */ DBG("Terminating all threads"); - if (notify_thread_pipe(thread_quit_pipe[1])) { + if (relayd_notify_thread_quit_pipe()) { ERR("write error on thread quit pipe"); retval = -1; } @@ -884,17 +866,6 @@ void lttng_relay_notify_ready(void) } } -/* - * Init thread quit pipe. - * - * Return -1 on error or 0 if all pipes are created. - */ -static int init_thread_quit_pipe(void) -{ - return fd_tracker_util_pipe_open_cloexec( - the_fd_tracker, "Quit pipe", thread_quit_pipe); -} - /* * Init health quit pipe. * @@ -906,52 +877,6 @@ static int init_health_quit_pipe(void) "Health quit pipe", health_quit_pipe); } -/* - * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. - */ -static int create_named_thread_poll_set(struct lttng_poll_event *events, - int size, const char *name) -{ - int ret; - - if (events == NULL || size == 0) { - ret = -1; - goto error; - } - - ret = fd_tracker_util_poll_create(the_fd_tracker, - name, events, 1, LTTNG_CLOEXEC); - if (ret) { - PERROR("Failed to create \"%s\" poll file descriptor", name); - goto error; - } - - /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); - if (ret < 0) { - goto error; - } - - return 0; - -error: - return ret; -} - -/* - * Check if the thread quit pipe was triggered. - * - * Return 1 if it was triggered else 0; - */ -static int check_thread_quit_pipe(int fd, uint32_t events) -{ - if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { - return 1; - } - - return 0; -} - static int create_sock(void *data, int *out_fd) { int ret; @@ -967,7 +892,7 @@ end: return ret; } -static int close_sock(void *data, int *in_fd) +static int close_sock(void *data, int *in_fd __attribute__((unused))) { struct lttcomm_sock *sock = (lttcomm_sock *) data; @@ -1079,10 +1004,10 @@ end: /* * This thread manages the listening for new connections on the network */ -static void *relay_thread_listener(void *data) +static void *relay_thread_listener(void *data __attribute__((unused))) { - int i, ret, pollfd, err = -1; - uint32_t revents, nb_fd; + int i, ret, err = -1; + uint32_t nb_fd; struct lttng_poll_event events; struct lttcomm_sock *control_sock, *data_sock; @@ -1153,15 +1078,15 @@ restart: DBG("Relay new connection received"); for (i = 0; i < nb_fd; i++) { - health_code_update(); - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + health_code_update(); + + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -1283,7 +1208,7 @@ error_sock_control: /* * This thread manages the dispatching of the requests to worker threads */ -static void *relay_thread_dispatcher(void *data) +static void *relay_thread_dispatcher(void *data __attribute__((unused))) { int err = -1; ssize_t ret; @@ -1321,7 +1246,7 @@ static void *relay_thread_dispatcher(void *data) /* Continue thread execution */ break; } - new_conn = caa_container_of(node, struct relay_connection, qnode); + new_conn = lttng::utils::container_of(node, &relay_connection::qnode); DBG("Dispatching request waiting on sock %d", new_conn->sock->fd); @@ -1370,7 +1295,8 @@ static bool session_streams_have_index(const struct relay_session *session) * * On success, send back the session id or else return a negative value. */ -static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_create_session( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -1562,7 +1488,8 @@ end: /* * relay_add_stream: allocate a new stream for a session */ -static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_add_stream( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -1701,7 +1628,8 @@ end_no_session: /* * relay_close_stream: close a specific stream */ -static int relay_close_stream(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_close_stream( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -1753,24 +1681,6 @@ static int relay_close_stream(const struct lttcomm_relayd_hdr *recv_hdr, * request. */ try_stream_close(stream); - if (stream->is_metadata) { - struct relay_viewer_stream *vstream; - - vstream = viewer_stream_get_by_id(stream->stream_handle); - if (vstream) { - if (stream->no_new_metadata_notified) { - /* - * Since all the metadata has been sent to the - * viewer and that we have a request to close - * its stream, we can safely teardown the - * corresponding metadata viewer stream. - */ - viewer_stream_put(vstream); - } - /* Put local reference. */ - viewer_stream_put(vstream); - } - } stream_put(stream); ret = 0; @@ -1797,7 +1707,8 @@ end_no_session: * relay_reset_metadata: reset a metadata stream */ static -int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr, +int relay_reset_metadata( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -1896,9 +1807,10 @@ static void relay_unknown_command(struct relay_connection *conn) * relay_start: send an acknowledgment to the client to tell if we are * ready to receive data. We are ready if a session is established. */ -static int relay_start(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_start( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, - const struct lttng_buffer_view *payload) + const struct lttng_buffer_view *payload __attribute__((unused))) { int ret = 0; ssize_t send_ret; @@ -1989,7 +1901,8 @@ end: /* * relay_send_version: send relayd version number */ -static int relay_send_version(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_send_version( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2059,7 +1972,8 @@ end: /* * Check for data pending for a given stream id from the session daemon. */ -static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_data_pending( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2148,7 +2062,8 @@ end_no_session: * the control socket has been handled. So, this is why we simply return * OK here. */ -static int relay_quiescent_control(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_quiescent_control( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2288,7 +2203,8 @@ end_no_session: * * Return to the client if there is data in flight or not with a ret_code. */ -static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_end_data_pending( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2380,7 +2296,8 @@ end_no_session: * * Return 0 on success else a negative value. */ -static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_recv_index( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2468,9 +2385,10 @@ end_no_session: * * Return 0 on success else a negative value. */ -static int relay_streams_sent(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_streams_sent( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, - const struct lttng_buffer_view *payload) + const struct lttng_buffer_view *payload __attribute__((unused))) { int ret; ssize_t send_ret; @@ -2508,12 +2426,128 @@ end_no_session: return ret; } +static ssize_t relay_unpack_rotate_streams_header( + const struct lttng_buffer_view *payload, + struct lttcomm_relayd_rotate_streams *_rotate_streams) +{ + struct lttcomm_relayd_rotate_streams rotate_streams; + /* + * Set to the smallest version (packed) of `lttcomm_relayd_rotate_streams`. + * This is the smallest version of this structure, but it can be larger; + * this variable is updated once the proper size of the structure is known. + * + * See comment at the declaration of this structure for more information. + */ + ssize_t header_len = sizeof(struct lttcomm_relayd_rotate_streams_packed); + size_t expected_payload_size_no_padding, + expected_payload_size_3_bytes_padding, + expected_payload_size_7_bytes_padding; + + if (payload->size < header_len) { + ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected >= %zu bytes, got %zu bytes", + header_len, payload->size); + goto error; + } + + /* + * Some versions incorrectly omitted the LTTNG_PACKED annotation on the + * `new_chunk_id` optional field of struct lttcomm_relayd_rotate_streams. + * + * We start by "unpacking" `stream_count` to figure out the padding length + * emited by our peer. + */ + { + decltype(rotate_streams.stream_count) stream_count; + + memcpy(&stream_count, payload->data, sizeof(stream_count)); + rotate_streams.stream_count = be32toh(stream_count); + } + + rotate_streams.new_chunk_id = LTTNG_OPTIONAL_INIT_UNSET; + + /* + * Payload size expected given the possible padding lengths in + * `struct lttcomm_relayd_rotate_streams`. + */ + expected_payload_size_no_padding = (rotate_streams.stream_count * + sizeof(*rotate_streams.rotation_positions)) + + sizeof(lttcomm_relayd_rotate_streams_packed); + expected_payload_size_3_bytes_padding = (rotate_streams.stream_count * + sizeof(*rotate_streams.rotation_positions)) + + sizeof(lttcomm_relayd_rotate_streams_3_bytes_padding); + expected_payload_size_7_bytes_padding = (rotate_streams.stream_count * + sizeof(*rotate_streams.rotation_positions)) + + sizeof(lttcomm_relayd_rotate_streams_7_bytes_padding); + + if (payload->size == expected_payload_size_no_padding) { + struct lttcomm_relayd_rotate_streams_packed packed_rotate_streams; + + /* + * This handles cases where someone might build with + * -fpack-struct or any other toolchain that wouldn't produce + * padding to align `value`. + */ + DBG("Received `struct lttcomm_relayd_rotate_streams` with no padding"); + + header_len = sizeof(packed_rotate_streams); + memcpy(&packed_rotate_streams, payload->data, header_len); + + /* Unpack the packed structure to the natively-packed version. */ + _rotate_streams->new_chunk_id = (typeof(_rotate_streams->new_chunk_id)){ + .is_set = !!packed_rotate_streams.new_chunk_id.is_set, + .value = be64toh(packed_rotate_streams.new_chunk_id.value), + }; + _rotate_streams->stream_count = be32toh(packed_rotate_streams.stream_count); + } else if (payload->size == expected_payload_size_3_bytes_padding) { + struct lttcomm_relayd_rotate_streams_3_bytes_padding padded_rotate_streams; + + DBG("Received `struct lttcomm_relayd_rotate_streams` with 3 bytes of padding (4-byte aligned peer)"); + + header_len = sizeof(padded_rotate_streams); + memcpy(&padded_rotate_streams, payload->data, header_len); + + /* Unpack the 3-byte padded structure to the natively-packed version. */ + _rotate_streams->new_chunk_id = (typeof(_rotate_streams->new_chunk_id)){ + .is_set = !!padded_rotate_streams.new_chunk_id.is_set, + .value = be64toh(padded_rotate_streams.new_chunk_id.value), + }; + _rotate_streams->stream_count = be32toh(padded_rotate_streams.stream_count); + } else if (payload->size == expected_payload_size_7_bytes_padding) { + struct lttcomm_relayd_rotate_streams_7_bytes_padding padded_rotate_streams; + + DBG("Received `struct lttcomm_relayd_rotate_streams` with 7 bytes of padding (8-byte aligned peer)"); + + header_len = sizeof(padded_rotate_streams); + memcpy(&padded_rotate_streams, payload->data, header_len); + + /* Unpack the 7-byte padded structure to the natively-packed version. */ + _rotate_streams->new_chunk_id = (typeof(_rotate_streams->new_chunk_id)){ + .is_set = !!padded_rotate_streams.new_chunk_id.is_set, + .value = be64toh(padded_rotate_streams.new_chunk_id.value), + }; + _rotate_streams->stream_count = be32toh(padded_rotate_streams.stream_count); + + header_len = sizeof(padded_rotate_streams); + } else { + ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected %zu, %zu or %zu bytes, got %zu bytes", + expected_payload_size_no_padding, + expected_payload_size_3_bytes_padding, + expected_payload_size_7_bytes_padding, + payload->size); + goto error; + } + + return header_len; +error: + return -1; +} + /* * relay_rotate_session_stream: rotate a stream to a new tracefile for the * session rotation feature (not the tracefile rotation feature). */ static int relay_rotate_session_streams( - const struct lttcomm_relayd_hdr *recv_hdr, + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2525,11 +2559,11 @@ static int relay_rotate_session_streams( struct lttcomm_relayd_rotate_streams rotate_streams; struct lttcomm_relayd_generic_reply reply = {}; struct relay_stream *stream = NULL; - const size_t header_len = sizeof(struct lttcomm_relayd_rotate_streams); struct lttng_trace_chunk *next_trace_chunk = NULL; struct lttng_buffer_view stream_positions; char chunk_id_buf[MAX_INT_DEC_LEN(uint64_t)]; const char *chunk_id_str = "none"; + ssize_t header_len; if (!session || !conn->version_check_done) { ERR("Trying to rotate a stream before version check"); @@ -2543,24 +2577,12 @@ static int relay_rotate_session_streams( goto end_no_reply; } - if (payload->size < header_len) { - ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected >= %zu bytes, got %zu bytes", - header_len, payload->size); + header_len = relay_unpack_rotate_streams_header(payload, &rotate_streams); + if (header_len < 0) { ret = -1; goto end_no_reply; } - memcpy(&rotate_streams, payload->data, header_len); - - /* Convert header to host endianness. */ - rotate_streams = (typeof(rotate_streams)) { - .stream_count = be32toh(rotate_streams.stream_count), - .new_chunk_id = (typeof(rotate_streams.new_chunk_id)) { - .is_set = !!rotate_streams.new_chunk_id.is_set, - .value = be64toh(rotate_streams.new_chunk_id.value), - } - }; - if (rotate_streams.new_chunk_id.is_set) { /* * Retrieve the trace chunk the stream must transition to. As @@ -2569,7 +2591,10 @@ static int relay_rotate_session_streams( */ next_trace_chunk = sessiond_trace_chunk_registry_get_chunk( sessiond_trace_chunk_registry, - session->sessiond_uuid, session->id, + session->sessiond_uuid, + conn->session->id_sessiond.is_set ? + conn->session->id_sessiond.value : + conn->session->id, rotate_streams.new_chunk_id.value); if (!next_trace_chunk) { char uuid_str[LTTNG_UUID_STR_LEN]; @@ -2598,7 +2623,7 @@ static int relay_rotate_session_streams( chunk_id_str); stream_positions = lttng_buffer_view_from_view(payload, - sizeof(rotate_streams), -1); + header_len, -1); if (!stream_positions.data || stream_positions.size < (rotate_streams.stream_count * @@ -2657,12 +2682,11 @@ end_no_reply: return ret; } - - /* * relay_create_trace_chunk: create a new trace chunk */ -static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_create_trace_chunk( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2702,6 +2726,8 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, msg->creation_timestamp = be64toh(msg->creation_timestamp); msg->override_name_length = be32toh(msg->override_name_length); + pthread_mutex_lock(&conn->session->lock); + session->ongoing_rotation = true; if (session->current_trace_chunk && !lttng_trace_chunk_get_name_overridden(session->current_trace_chunk)) { chunk_status = lttng_trace_chunk_rename_path(session->current_trace_chunk, @@ -2713,7 +2739,6 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end; } } - session->ongoing_rotation = true; if (!session->current_trace_chunk) { if (!session->has_rotated) { new_path = ""; @@ -2792,7 +2817,9 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, published_chunk = sessiond_trace_chunk_registry_publish_chunk( sessiond_trace_chunk_registry, conn->session->sessiond_uuid, - conn->session->id, + conn->session->id_sessiond.is_set ? + conn->session->id_sessiond.value : + conn->session->id, chunk); if (!published_chunk) { char uuid_str[LTTNG_UUID_STR_LEN]; @@ -2807,7 +2834,6 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end; } - pthread_mutex_lock(&conn->session->lock); if (conn->session->pending_closure_trace_chunk) { /* * Invalid; this means a second create_trace_chunk command was @@ -2816,7 +2842,7 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, ERR("Invalid trace chunk close command received; a trace chunk is already waiting for a trace chunk close command"); reply_code = LTTNG_ERR_INVALID_PROTOCOL; ret = -1; - goto end_unlock_session; + goto end; } conn->session->pending_closure_trace_chunk = conn->session->current_trace_chunk; @@ -2825,9 +2851,8 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, if (!conn->session->pending_closure_trace_chunk) { session->ongoing_rotation = false; } -end_unlock_session: - pthread_mutex_unlock(&conn->session->lock); end: + pthread_mutex_unlock(&conn->session->lock); reply.ret_code = htobe32((uint32_t) reply_code); send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, @@ -2847,7 +2872,8 @@ end_no_reply: /* * relay_close_trace_chunk: close a trace chunk */ -static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_close_trace_chunk( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2900,7 +2926,9 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, chunk = sessiond_trace_chunk_registry_get_chunk( sessiond_trace_chunk_registry, conn->session->sessiond_uuid, - conn->session->id, + conn->session->id_sessiond.is_set ? + conn->session->id_sessiond.value : + conn->session->id, chunk_id); if (!chunk) { char uuid_str[LTTNG_UUID_STR_LEN]; @@ -3101,7 +3129,8 @@ end_no_reply: /* * relay_trace_chunk_exists: check if a trace chunk exists */ -static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_trace_chunk_exists( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -3164,7 +3193,8 @@ end_no_reply: /* * relay_get_configuration: query whether feature is available */ -static int relay_get_configuration(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_get_configuration( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -3210,86 +3240,68 @@ end_no_reply: return ret; } -#define DBG_CMD(cmd_name, conn) \ - DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd); - static int relay_process_control_command(struct relay_connection *conn, const struct lttcomm_relayd_hdr *header, const struct lttng_buffer_view *payload) { int ret = 0; + DBG3("Processing \"%s\" command for socket %i", + lttcomm_relayd_command_str((lttcomm_relayd_command) header->cmd), + conn->sock->fd); switch (header->cmd) { case RELAYD_CREATE_SESSION: - DBG_CMD("RELAYD_CREATE_SESSION", conn); ret = relay_create_session(header, conn, payload); break; case RELAYD_ADD_STREAM: - DBG_CMD("RELAYD_ADD_STREAM", conn); ret = relay_add_stream(header, conn, payload); break; case RELAYD_START_DATA: - DBG_CMD("RELAYD_START_DATA", conn); ret = relay_start(header, conn, payload); break; case RELAYD_SEND_METADATA: - DBG_CMD("RELAYD_SEND_METADATA", conn); ret = relay_recv_metadata(header, conn, payload); break; case RELAYD_VERSION: - DBG_CMD("RELAYD_VERSION", conn); ret = relay_send_version(header, conn, payload); break; case RELAYD_CLOSE_STREAM: - DBG_CMD("RELAYD_CLOSE_STREAM", conn); ret = relay_close_stream(header, conn, payload); break; case RELAYD_DATA_PENDING: - DBG_CMD("RELAYD_DATA_PENDING", conn); ret = relay_data_pending(header, conn, payload); break; case RELAYD_QUIESCENT_CONTROL: - DBG_CMD("RELAYD_QUIESCENT_CONTROL", conn); ret = relay_quiescent_control(header, conn, payload); break; case RELAYD_BEGIN_DATA_PENDING: - DBG_CMD("RELAYD_BEGIN_DATA_PENDING", conn); ret = relay_begin_data_pending(header, conn, payload); break; case RELAYD_END_DATA_PENDING: - DBG_CMD("RELAYD_END_DATA_PENDING", conn); ret = relay_end_data_pending(header, conn, payload); break; case RELAYD_SEND_INDEX: - DBG_CMD("RELAYD_SEND_INDEX", conn); ret = relay_recv_index(header, conn, payload); break; case RELAYD_STREAMS_SENT: - DBG_CMD("RELAYD_STREAMS_SENT", conn); ret = relay_streams_sent(header, conn, payload); break; case RELAYD_RESET_METADATA: - DBG_CMD("RELAYD_RESET_METADATA", conn); ret = relay_reset_metadata(header, conn, payload); break; case RELAYD_ROTATE_STREAMS: - DBG_CMD("RELAYD_ROTATE_STREAMS", conn); ret = relay_rotate_session_streams(header, conn, payload); break; case RELAYD_CREATE_TRACE_CHUNK: - DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn); ret = relay_create_trace_chunk(header, conn, payload); break; case RELAYD_CLOSE_TRACE_CHUNK: - DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn); ret = relay_close_trace_chunk(header, conn, payload); break; case RELAYD_TRACE_CHUNK_EXISTS: - DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn); ret = relay_trace_chunk_exists(header, conn, payload); break; case RELAYD_GET_CONFIGURATION: - DBG_CMD("RELAYD_GET_CONFIGURATION", conn); ret = relay_get_configuration(header, conn, payload); break; case RELAYD_UPDATE_SYNC_INFO: @@ -3324,7 +3336,10 @@ static enum relay_connection_status relay_process_control_receive_payload( reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Unable to receive command payload on sock %d", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; @@ -3397,7 +3412,10 @@ static enum relay_connection_status relay_process_control_receive_header( reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Unable to receive control command header on sock %d", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; @@ -3437,9 +3455,9 @@ static enum relay_connection_status relay_process_control_receive_header( memcpy(&conn->protocol.ctrl.state.receive_payload.header, &header, sizeof(header)); - DBG("Done receiving control command header: fd = %i, cmd = %" PRIu32 ", cmd_version = %" PRIu32 ", payload size = %" PRIu64 " bytes", - conn->sock->fd, header.cmd, header.cmd_version, - header.data_size); + DBG("Done receiving control command header: fd = %i, cmd = %s, cmd_version = %" PRIu32 ", payload size = %" PRIu64 " bytes", + conn->sock->fd, lttcomm_relayd_command_str((enum lttcomm_relayd_command) header.cmd), + header.cmd_version, header.data_size); if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) { ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.", @@ -3508,7 +3526,10 @@ static enum relay_connection_status relay_process_data_receive_header( state->header_reception_buffer + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Unable to receive data header on sock %d", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; } @@ -3635,7 +3656,10 @@ static enum relay_connection_status relay_process_data_receive_payload( ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Socket %d error", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; } @@ -3804,7 +3828,7 @@ static void relay_thread_close_connection(struct lttng_poll_event *events, /* * This thread does the actual work */ -static void *relay_thread_worker(void *data) +static void *relay_thread_worker(void *data __attribute__((unused))) { int ret, err = -1, last_seen_data_fd = -1; uint32_t nb_fd; @@ -3871,14 +3895,14 @@ restart: */ for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -4132,7 +4156,7 @@ static int create_relay_conn_pipe(void) "Relayd connection pipe", relay_conn_pipe); } -static int stdio_open(void *data, int *fds) +static int stdio_open(void *data __attribute__((unused)), int *fds) { fds[0] = fileno(stdout); fds[1] = fileno(stderr); @@ -4284,7 +4308,7 @@ int main(int argc, char **argv) } /* Create thread quit pipe */ - if (init_thread_quit_pipe()) { + if (relayd_init_thread_quit_pipe()) { retval = -1; goto exit_options; } @@ -4331,7 +4355,7 @@ int main(int argc, char **argv) /* Create thread to manage the client socket */ ret = pthread_create(&health_thread, default_pthread_attr(), - thread_manage_health, (void *) NULL); + thread_manage_health_relayd, (void *) NULL); if (ret) { errno = ret; PERROR("pthread_create health");