projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Propagate whether a connection was closed cleanly or after an error
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
main.c
diff --git
a/src/bin/lttng-relayd/main.c
b/src/bin/lttng-relayd/main.c
index a312873808d09448bedae6ab9e87344ef0905f5c..09a73e392ada24798e5e1c143000f0572a253c2f 100644
(file)
--- a/
src/bin/lttng-relayd/main.c
+++ b/
src/bin/lttng-relayd/main.c
@@
-83,6
+83,14
@@
NULL
#endif
;
#endif
;
+enum relay_connection_status {
+ RELAY_CONNECTION_STATUS_OK,
+ /* An error occured while processing an event on the connection. */
+ RELAY_CONNECTION_STATUS_ERROR,
+ /* Connection closed/shutdown cleanly. */
+ RELAY_CONNECTION_STATUS_CLOSED,
+};
+
/* command line options */
char *opt_output_path;
static int opt_daemon, opt_background;
/* command line options */
char *opt_output_path;
static int opt_daemon, opt_background;
@@
-2949,9
+2957,11
@@
end:
return ret;
}
return ret;
}
-static int relay_process_control_receive_payload(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control_receive_payload(
+ struct relay_connection *conn)
{
int ret = 0;
{
int ret = 0;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct lttng_dynamic_buffer *reception_buffer =
&conn->protocol.ctrl.reception_buffer;
struct ctrl_connection_state_receive_payload *state =
struct lttng_dynamic_buffer *reception_buffer =
&conn->protocol.ctrl.reception_buffer;
struct ctrl_connection_state_receive_payload *state =
@@
-2967,11
+2977,15
@@
static int relay_process_control_receive_payload(struct relay_connection *conn)
reception_buffer->data + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
reception_buffer->data + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Unable to receive command payload on sock %d", conn->sock->fd);
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Unable to receive command payload on sock %d",
+ conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end;
} else if (ret == 0) {
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
goto end;
} else if (ret == 0) {
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
-
ret = -1
;
+
status = RELAY_CONNECTION_STATUS_CLOSED
;
goto end;
}
goto end;
}
@@
-2989,7
+3003,6
@@
static int relay_process_control_receive_payload(struct relay_connection *conn)
DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
state->received, state->left_to_receive,
conn->sock->fd);
DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
state->received, state->left_to_receive,
conn->sock->fd);
- ret = 0;
goto end;
}
goto end;
}
@@
-3008,17
+3021,23
@@
reception_complete:
ret = relay_process_control_command(conn,
&state->header, &payload_view);
if (ret < 0) {
ret = relay_process_control_command(conn,
&state->header, &payload_view);
if (ret < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
ret = connection_reset_protocol_state(conn);
goto end;
}
ret = connection_reset_protocol_state(conn);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
end:
end:
- return
ret
;
+ return
status
;
}
}
-static int relay_process_control_receive_header(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control_receive_header(
+ struct relay_connection *conn)
{
int ret = 0;
{
int ret = 0;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct lttcomm_relayd_hdr header;
struct lttng_dynamic_buffer *reception_buffer =
&conn->protocol.ctrl.reception_buffer;
struct lttcomm_relayd_hdr header;
struct lttng_dynamic_buffer *reception_buffer =
&conn->protocol.ctrl.reception_buffer;
@@
-3031,11
+3050,15
@@
static int relay_process_control_receive_header(struct relay_connection *conn)
reception_buffer->data + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
reception_buffer->data + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Unable to receive control command header on sock %d", conn->sock->fd);
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Unable to receive control command header on sock %d",
+ conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end;
} else if (ret == 0) {
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
goto end;
} else if (ret == 0) {
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
-
ret = -1
;
+
status = RELAY_CONNECTION_STATUS_CLOSED
;
goto end;
}
goto end;
}
@@
-3053,7
+3076,6
@@
static int relay_process_control_receive_header(struct relay_connection *conn)
DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
state->received, state->left_to_receive,
conn->sock->fd);
DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
state->received, state->left_to_receive,
conn->sock->fd);
- ret = 0;
goto end;
}
goto end;
}
@@
-3075,7
+3097,7
@@
static int relay_process_control_receive_header(struct relay_connection *conn)
if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) {
ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.",
header.data_size);
if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) {
ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.",
header.data_size);
-
ret = -1
;
+
status = RELAY_CONNECTION_STATUS_ERROR
;
goto end;
}
goto end;
}
@@
-3085,6
+3107,7
@@
static int relay_process_control_receive_header(struct relay_connection *conn)
ret = lttng_dynamic_buffer_set_size(reception_buffer,
header.data_size);
if (ret) {
ret = lttng_dynamic_buffer_set_size(reception_buffer,
header.data_size);
if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
goto end;
}
@@
-3093,32
+3116,33
@@
static int relay_process_control_receive_header(struct relay_connection *conn)
* Manually invoke the next state as the poll loop
* will not wake-up to allow us to proceed further.
*/
* Manually invoke the next state as the poll loop
* will not wake-up to allow us to proceed further.
*/
-
ret
= relay_process_control_receive_payload(conn);
+
status
= relay_process_control_receive_payload(conn);
}
end:
}
end:
- return
ret
;
+ return
status
;
}
/*
* Process the commands received on the control socket
*/
}
/*
* Process the commands received on the control socket
*/
-static int relay_process_control(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control(
+ struct relay_connection *conn)
{
{
-
int ret = 0
;
+
enum relay_connection_status status
;
switch (conn->protocol.ctrl.state_id) {
case CTRL_CONNECTION_STATE_RECEIVE_HEADER:
switch (conn->protocol.ctrl.state_id) {
case CTRL_CONNECTION_STATE_RECEIVE_HEADER:
-
ret
= relay_process_control_receive_header(conn);
+
status
= relay_process_control_receive_header(conn);
break;
case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD:
break;
case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD:
-
ret
= relay_process_control_receive_payload(conn);
+
status
= relay_process_control_receive_payload(conn);
break;
default:
ERR("Unknown control connection protocol state encountered.");
abort();
}
break;
default:
ERR("Unknown control connection protocol state encountered.");
abort();
}
- return
ret
;
+ return
status
;
}
/*
}
/*
@@
-3190,9
+3214,11
@@
end:
return ret;
}
return ret;
}
-static int relay_process_data_receive_header(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data_receive_header(
+ struct relay_connection *conn)
{
int ret;
{
int ret;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct data_connection_state_receive_header *state =
&conn->protocol.data.state.receive_header;
struct lttcomm_relayd_data_hdr header;
struct data_connection_state_receive_header *state =
&conn->protocol.data.state.receive_header;
struct lttcomm_relayd_data_hdr header;
@@
-3204,12
+3230,15
@@
static int relay_process_data_receive_header(struct relay_connection *conn)
state->header_reception_buffer + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
state->header_reception_buffer + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Unable to receive data header on sock %d", conn->sock->fd);
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Unable to receive data header on sock %d", conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end;
} else if (ret == 0) {
/* Orderly shutdown. Not necessary to print an error. */
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
goto end;
} else if (ret == 0) {
/* Orderly shutdown. Not necessary to print an error. */
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
-
ret = -1
;
+
status = RELAY_CONNECTION_STATUS_CLOSED
;
goto end;
}
goto end;
}
@@
-3256,7
+3285,8
@@
static int relay_process_data_receive_header(struct relay_connection *conn)
if (!stream) {
DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
header.stream_id);
if (!stream) {
DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
header.stream_id);
- ret = 0;
+ /* Protocol error. */
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
goto end;
}
@@
-3281,6
+3311,7
@@
static int relay_process_data_receive_header(struct relay_connection *conn)
&new_id, &stream->stream_fd->fd);
if (ret < 0) {
ERR("Failed to rotate stream output file");
&new_id, &stream->stream_fd->fd);
if (ret < 0) {
ERR("Failed to rotate stream output file");
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
goto end_stream_unlock;
}
@@
-3297,12
+3328,14
@@
end_stream_unlock:
pthread_mutex_unlock(&stream->lock);
stream_put(stream);
end:
pthread_mutex_unlock(&stream->lock);
stream_put(stream);
end:
- return
ret
;
+ return
status
;
}
}
-static int relay_process_data_receive_payload(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data_receive_payload(
+ struct relay_connection *conn)
{
int ret;
{
int ret;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct relay_stream *stream;
struct data_connection_state_receive_payload *state =
&conn->protocol.data.state.receive_payload;
struct relay_stream *stream;
struct data_connection_state_receive_payload *state =
&conn->protocol.data.state.receive_payload;
@@
-3315,9
+3348,10
@@
static int relay_process_data_receive_payload(struct relay_connection *conn)
stream = stream_get_by_id(state->header.stream_id);
if (!stream) {
stream = stream_get_by_id(state->header.stream_id);
if (!stream) {
+ /* Protocol error. */
DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
state->header.stream_id);
DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
state->header.stream_id);
-
ret = 0
;
+
status = RELAY_CONNECTION_STATUS_ERROR
;
goto end;
}
goto end;
}
@@
-3341,13
+3375,16
@@
static int relay_process_data_receive_payload(struct relay_connection *conn)
ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
recv_size, MSG_DONTWAIT);
if (ret < 0) {
ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
recv_size, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Socket %d error %d", conn->sock->fd, ret);
- ret = -1;
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Socket %d error", conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end_stream_unlock;
} else if (ret == 0) {
/* No more data ready to be consumed on socket. */
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
state->header.stream_id);
goto end_stream_unlock;
} else if (ret == 0) {
/* No more data ready to be consumed on socket. */
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
state->header.stream_id);
+ status = RELAY_CONNECTION_STATUS_CLOSED;
break;
} else if (ret < (int) recv_size) {
/*
break;
} else if (ret < (int) recv_size) {
/*
@@
-3364,7
+3401,7
@@
static int relay_process_data_receive_payload(struct relay_connection *conn)
recv_size);
if (write_ret < (ssize_t) recv_size) {
ERR("Relay error writing data to file");
recv_size);
if (write_ret < (ssize_t) recv_size) {
ERR("Relay error writing data to file");
-
ret = -1
;
+
status = RELAY_CONNECTION_STATUS_ERROR
;
goto end_stream_unlock;
}
goto end_stream_unlock;
}
@@
-3384,7
+3421,6
@@
static int relay_process_data_receive_payload(struct relay_connection *conn)
DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->received,
state->left_to_receive);
DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->received,
state->left_to_receive);
- ret = 0;
goto end_stream_unlock;
}
goto end_stream_unlock;
}
@@
-3394,6
+3430,7
@@
static int relay_process_data_receive_payload(struct relay_connection *conn)
ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
goto end_stream_unlock;
}
@@
-3405,6
+3442,7
@@
static int relay_process_data_receive_payload(struct relay_connection *conn)
ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
}
goto end_stream_unlock;
}
}
@@
-3432,6
+3470,7
@@
static int relay_process_data_receive_payload(struct relay_connection *conn)
ret = try_rotate_stream(stream);
if (ret < 0) {
ret = try_rotate_stream(stream);
if (ret < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
goto end_stream_unlock;
}
@@
-3450,29
+3489,30
@@
end_stream_unlock:
stream_put(stream);
end:
stream_put(stream);
end:
- return
ret
;
+ return
status
;
}
/*
* relay_process_data: Process the data received on the data socket
*/
}
/*
* relay_process_data: Process the data received on the data socket
*/
-static int relay_process_data(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data(
+ struct relay_connection *conn)
{
{
-
int ret
;
+
enum relay_connection_status status
;
switch (conn->protocol.data.state_id) {
case DATA_CONNECTION_STATE_RECEIVE_HEADER:
switch (conn->protocol.data.state_id) {
case DATA_CONNECTION_STATE_RECEIVE_HEADER:
-
ret
= relay_process_data_receive_header(conn);
+
status
= relay_process_data_receive_header(conn);
break;
case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD:
break;
case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD:
-
ret
= relay_process_data_receive_payload(conn);
+
status
= relay_process_data_receive_payload(conn);
break;
default:
ERR("Unexpected data connection communication state.");
abort();
}
break;
default:
ERR("Unexpected data connection communication state.");
abort();
}
- return
ret
;
+ return
status
;
}
static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
}
static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
@@
-3644,9
+3684,11
@@
restart:
assert(ctrl_conn->type == RELAY_CONTROL);
if (revents & LPOLLIN) {
assert(ctrl_conn->type == RELAY_CONTROL);
if (revents & LPOLLIN) {
- ret = relay_process_control(ctrl_conn);
- if (ret < 0) {
- /* Clear the connection on error. */
+ enum relay_connection_status status;
+
+ status = relay_process_control(ctrl_conn);
+ if (status != RELAY_CONNECTION_STATUS_OK) {
+ /* Clear the connection on error or close. */
relay_thread_close_connection(&events,
pollfd,
ctrl_conn);
relay_thread_close_connection(&events,
pollfd,
ctrl_conn);
@@
-3720,9
+3762,11
@@
restart:
assert(data_conn->type == RELAY_DATA);
if (revents & LPOLLIN) {
assert(data_conn->type == RELAY_DATA);
if (revents & LPOLLIN) {
- ret = relay_process_data(data_conn);
- /* Connection closed */
- if (ret < 0) {
+ enum relay_connection_status status;
+
+ status = relay_process_data(data_conn);
+ /* Connection closed or error. */
+ if (status != RELAY_CONNECTION_STATUS_OK) {
relay_thread_close_connection(&events, pollfd,
data_conn);
/*
relay_thread_close_connection(&events, pollfd,
data_conn);
/*
This page took
0.030552 seconds
and
4
git commands to generate.