X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.cpp;h=72bf30b7ec10589ce797b83af494bc6f7e782e78;hb=942003e52b8fe43bfb8f28a1884d3bda7e6d1e0b;hp=dbf69a0e6537f119992ca004ec34e8e25bd3b36f;hpb=8cd15f6adb2b119ef9ea231363d74818e14ecffc;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index dbf69a0e6..72bf30b7e 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -49,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -2663,7 +2664,10 @@ static int relay_rotate_session_streams( */ next_trace_chunk = sessiond_trace_chunk_registry_get_chunk( sessiond_trace_chunk_registry, - session->sessiond_uuid, session->id, + session->sessiond_uuid, + conn->session->id_sessiond.is_set ? + conn->session->id_sessiond.value : + conn->session->id, rotate_streams.new_chunk_id.value); if (!next_trace_chunk) { char uuid_str[LTTNG_UUID_STR_LEN]; @@ -2885,7 +2889,9 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, published_chunk = sessiond_trace_chunk_registry_publish_chunk( sessiond_trace_chunk_registry, conn->session->sessiond_uuid, - conn->session->id, + conn->session->id_sessiond.is_set ? + conn->session->id_sessiond.value : + conn->session->id, chunk); if (!published_chunk) { char uuid_str[LTTNG_UUID_STR_LEN]; @@ -2991,7 +2997,9 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, chunk = sessiond_trace_chunk_registry_get_chunk( sessiond_trace_chunk_registry, conn->session->sessiond_uuid, - conn->session->id, + conn->session->id_sessiond.is_set ? + conn->session->id_sessiond.value : + conn->session->id, chunk_id); if (!chunk) { char uuid_str[LTTNG_UUID_STR_LEN]; @@ -3301,86 +3309,68 @@ end_no_reply: return ret; } -#define DBG_CMD(cmd_name, conn) \ - DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd); - static int relay_process_control_command(struct relay_connection *conn, const struct lttcomm_relayd_hdr *header, const struct lttng_buffer_view *payload) { int ret = 0; + DBG3("Processing \"%s\" command for socket %i", + lttcomm_relayd_command_str((lttcomm_relayd_command) header->cmd), + conn->sock->fd); switch (header->cmd) { case RELAYD_CREATE_SESSION: - DBG_CMD("RELAYD_CREATE_SESSION", conn); ret = relay_create_session(header, conn, payload); break; case RELAYD_ADD_STREAM: - DBG_CMD("RELAYD_ADD_STREAM", conn); ret = relay_add_stream(header, conn, payload); break; case RELAYD_START_DATA: - DBG_CMD("RELAYD_START_DATA", conn); ret = relay_start(header, conn, payload); break; case RELAYD_SEND_METADATA: - DBG_CMD("RELAYD_SEND_METADATA", conn); ret = relay_recv_metadata(header, conn, payload); break; case RELAYD_VERSION: - DBG_CMD("RELAYD_VERSION", conn); ret = relay_send_version(header, conn, payload); break; case RELAYD_CLOSE_STREAM: - DBG_CMD("RELAYD_CLOSE_STREAM", conn); ret = relay_close_stream(header, conn, payload); break; case RELAYD_DATA_PENDING: - DBG_CMD("RELAYD_DATA_PENDING", conn); ret = relay_data_pending(header, conn, payload); break; case RELAYD_QUIESCENT_CONTROL: - DBG_CMD("RELAYD_QUIESCENT_CONTROL", conn); ret = relay_quiescent_control(header, conn, payload); break; case RELAYD_BEGIN_DATA_PENDING: - DBG_CMD("RELAYD_BEGIN_DATA_PENDING", conn); ret = relay_begin_data_pending(header, conn, payload); break; case RELAYD_END_DATA_PENDING: - DBG_CMD("RELAYD_END_DATA_PENDING", conn); ret = relay_end_data_pending(header, conn, payload); break; case RELAYD_SEND_INDEX: - DBG_CMD("RELAYD_SEND_INDEX", conn); ret = relay_recv_index(header, conn, payload); break; case RELAYD_STREAMS_SENT: - DBG_CMD("RELAYD_STREAMS_SENT", conn); ret = relay_streams_sent(header, conn, payload); break; case RELAYD_RESET_METADATA: - DBG_CMD("RELAYD_RESET_METADATA", conn); ret = relay_reset_metadata(header, conn, payload); break; case RELAYD_ROTATE_STREAMS: - DBG_CMD("RELAYD_ROTATE_STREAMS", conn); ret = relay_rotate_session_streams(header, conn, payload); break; case RELAYD_CREATE_TRACE_CHUNK: - DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn); ret = relay_create_trace_chunk(header, conn, payload); break; case RELAYD_CLOSE_TRACE_CHUNK: - DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn); ret = relay_close_trace_chunk(header, conn, payload); break; case RELAYD_TRACE_CHUNK_EXISTS: - DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn); ret = relay_trace_chunk_exists(header, conn, payload); break; case RELAYD_GET_CONFIGURATION: - DBG_CMD("RELAYD_GET_CONFIGURATION", conn); ret = relay_get_configuration(header, conn, payload); break; case RELAYD_UPDATE_SYNC_INFO: @@ -3415,7 +3405,10 @@ static enum relay_connection_status relay_process_control_receive_payload( reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Unable to receive command payload on sock %d", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; @@ -3488,7 +3481,10 @@ static enum relay_connection_status relay_process_control_receive_header( reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Unable to receive control command header on sock %d", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; @@ -3528,9 +3524,9 @@ static enum relay_connection_status relay_process_control_receive_header( memcpy(&conn->protocol.ctrl.state.receive_payload.header, &header, sizeof(header)); - DBG("Done receiving control command header: fd = %i, cmd = %" PRIu32 ", cmd_version = %" PRIu32 ", payload size = %" PRIu64 " bytes", - conn->sock->fd, header.cmd, header.cmd_version, - header.data_size); + DBG("Done receiving control command header: fd = %i, cmd = %s, cmd_version = %" PRIu32 ", payload size = %" PRIu64 " bytes", + conn->sock->fd, lttcomm_relayd_command_str((enum lttcomm_relayd_command) header.cmd), + header.cmd_version, header.data_size); if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) { ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.", @@ -3599,7 +3595,10 @@ static enum relay_connection_status relay_process_data_receive_header( state->header_reception_buffer + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Unable to receive data header on sock %d", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; } @@ -3726,7 +3725,10 @@ static enum relay_connection_status relay_process_data_receive_payload( ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Socket %d error", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; }