X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=30c7bd1675525e56e213b089d3a0ce2ed5eee0dc;hb=584fc280d529740882a24c87a431e547b5acf8e0;hp=905e245faad068228f2fd74c007818e019ebda23;hpb=b8ee61ff06b128ff98f97bc45097ee8e582537ee;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 905e245fa..30c7bd167 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -69,6 +69,7 @@ #include "session.h" #include "stream.h" #include "connection.h" +#include "tracefile-array.h" /* command line options */ char *opt_output_path; @@ -831,10 +832,7 @@ restart: goto exit; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { /* * A new connection is requested, therefore a * sessiond/consumerd connection is allocated in @@ -886,6 +884,12 @@ restart: * exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&relay_conn_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1233,9 +1237,24 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end; } + + /* + * Set last_net_seq_num before the close flag. Required by data + * pending check. + */ pthread_mutex_lock(&stream->lock); - stream->closed = true; stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); + pthread_mutex_unlock(&stream->lock); + + /* + * This is one of the conditions which may trigger a stream close + * with the others being: + * 1) A close command is received for a stream + * 2) The control connection owning the stream is closed + * 3) We have received all of the stream's data _after_ a close + * request. + */ + try_stream_close(stream); if (stream->is_metadata) { struct relay_viewer_stream *vstream; @@ -1254,7 +1273,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, viewer_stream_put(vstream); } } - pthread_mutex_unlock(&stream->lock); stream_put(stream); end: @@ -1355,7 +1373,7 @@ end: static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { - int ret = htobe32(LTTNG_OK); + int ret = 0; ssize_t size_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_metadata_payload *metadata_struct; @@ -1392,9 +1410,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, } memset(data_buffer, 0, data_size); DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (ret < 0 || ret != data_size) { - if (ret == 0) { + size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); + if (size_ret < 0 || size_ret != data_size) { + if (size_ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); } else { @@ -1421,9 +1439,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, goto end_put; } - ret = write_padding_to_file(metadata_stream->stream_fd->fd, + size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, be32toh(metadata_struct->padding_size)); - if (ret < 0) { + if (size_ret < 0) { goto end_put; } @@ -1435,7 +1453,6 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); - end: return ret; } @@ -1858,7 +1875,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, * Only flag a stream inactive when it has already * received data and no indexes are in flight. */ - if (stream->total_index_received > 0 + if (stream->index_received_seqcount > 0 && stream->indexes_in_flight == 0) { stream->beacon_ts_end = be64toh(index_info.timestamp_end); @@ -1886,7 +1903,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, } ret = relay_index_try_flush(index); if (ret == 0) { - stream->total_index_received++; + tracefile_array_commit_seq(stream->tfa); + stream->index_received_seqcount++; } else if (ret > 0) { /* no flush. */ ret = 0; @@ -2059,7 +2077,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, fd = index_create_file(stream->path_name, stream->channel_name, -1, -1, stream->tracefile_size, - stream->current_tracefile_id); + tracefile_array_get_file_index_head(stream->tfa)); if (fd < 0) { ret = -1; /* Put self-ref for this index due to error. */ @@ -2088,7 +2106,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, ret = relay_index_try_flush(index); if (ret == 0) { - stream->total_index_received++; + tracefile_array_commit_seq(stream->tfa); + stream->index_received_seqcount++; } else if (ret > 0) { /* No flush. */ ret = 0; @@ -2114,7 +2133,7 @@ static int relay_process_data(struct relay_connection *conn) uint64_t net_seq_num; uint32_t data_size; struct relay_session *session; - bool new_stream = false; + bool new_stream = false, close_requested = false; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2172,35 +2191,23 @@ static int relay_process_data(struct relay_connection *conn) if (stream->tracefile_size > 0 && (stream->tracefile_size_current + data_size) > stream->tracefile_size) { - uint64_t new_id; + uint64_t old_id, new_id; + + old_id = tracefile_array_get_file_index_head(stream->tfa); + tracefile_array_file_rotate(stream->tfa); + + /* new_id is updated by utils_rotate_stream_file. */ + new_id = old_id; - new_id = (stream->current_tracefile_id + 1) % - stream->tracefile_count; - /* - * Move viewer oldest available data position forward if - * we are overwriting a tracefile. - */ - if (new_id == stream->oldest_tracefile_id) { - stream->oldest_tracefile_id = - (stream->oldest_tracefile_id + 1) % - stream->tracefile_count; - } ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, stream->tracefile_count, -1, -1, stream->stream_fd->fd, - &stream->current_tracefile_id, - &stream->stream_fd->fd); + &new_id, &stream->stream_fd->fd); if (ret < 0) { ERR("Rotating stream output file"); goto end_stream_unlock; } - stream->current_tracefile_seq++; - if (stream->current_tracefile_seq - - stream->oldest_tracefile_seq >= - stream->tracefile_count) { - stream->oldest_tracefile_seq++; - } /* * Reset current size because we just performed a stream * rotation. @@ -2245,7 +2252,12 @@ static int relay_process_data(struct relay_connection *conn) stream->prev_seq = net_seq_num; end_stream_unlock: + close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); + if (close_requested) { + try_stream_close(stream); + } + if (new_stream) { pthread_mutex_lock(&session->lock); uatomic_set(&session->new_streams, 1);