relayd: optimize receive throughput
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 057ac4046c01e67ae05fa938db49e89c9c913718..1ac975660b5ada3a9fe30a2bd5330b22ca1b94aa 100644 (file)
@@ -71,6 +71,7 @@
 #include "session.h"
 #include "stream.h"
 #include "connection.h"
+#include "tracefile-array.h"
 
 /* command line options */
 char *opt_output_path;
@@ -82,6 +83,10 @@ static int opt_daemon, opt_background;
  */
 #define NR_LTTNG_RELAY_READY   3
 static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
+
+/* Size of receive buffer. */
+#define RECV_DATA_BUFFER_SIZE          65536
+
 static int recv_child_signal;  /* Set to 1 when a SIGUSR1 signal is received. */
 static pid_t child_ppid;       /* Internal parent PID use with daemonize. */
 
@@ -863,10 +868,7 @@ restart:
                                goto exit;
                        }
 
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("socket poll error");
-                               goto error;
-                       } else if (revents & LPOLLIN) {
+                       if (revents & LPOLLIN) {
                                /*
                                 * A new connection is requested, therefore a
                                 * sessiond/consumerd connection is allocated in
@@ -918,6 +920,12 @@ restart:
                                 * exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&relay_conn_queue.futex);
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("socket poll error");
+                               goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
@@ -1265,9 +1273,24 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                ret = -1;
                goto end;
        }
+
+       /*
+        * Set last_net_seq_num before the close flag. Required by data
+        * pending check.
+        */
        pthread_mutex_lock(&stream->lock);
-       stream->closed = true;
        stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+       pthread_mutex_unlock(&stream->lock);
+
+       /*
+        * This is one of the conditions which may trigger a stream close
+        * with the others being:
+        *     1) A close command is received for a stream
+        *     2) The control connection owning the stream is closed
+        *     3) We have received all of the stream's data _after_ a close
+        *        request.
+        */
+       try_stream_close(stream);
        if (stream->is_metadata) {
                struct relay_viewer_stream *vstream;
 
@@ -1286,7 +1309,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                        viewer_stream_put(vstream);
                }
        }
-       pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
 
 end:
@@ -1387,7 +1409,7 @@ end:
 static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
                struct relay_connection *conn)
 {
-       int ret = htobe32(LTTNG_OK);
+       int ret = 0;
        ssize_t size_ret;
        struct relay_session *session = conn->session;
        struct lttcomm_relayd_metadata_payload *metadata_struct;
@@ -1424,9 +1446,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
        }
        memset(data_buffer, 0, data_size);
        DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size);
-       ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
-       if (ret < 0 || ret != data_size) {
-               if (ret == 0) {
+       size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
+       if (size_ret < 0 || size_ret != data_size) {
+               if (size_ret == 0) {
                        /* Orderly shutdown. Not necessary to print an error. */
                        DBG("Socket %d did an orderly shutdown", conn->sock->fd);
                } else {
@@ -1453,9 +1475,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
                goto end_put;
        }
 
-       ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+       size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
                        be32toh(metadata_struct->padding_size));
-       if (ret < 0) {
+       if (size_ret < 0) {
                goto end_put;
        }
 
@@ -1467,7 +1489,6 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
 end_put:
        pthread_mutex_unlock(&metadata_stream->lock);
        stream_put(metadata_stream);
-
 end:
        return ret;
 }
@@ -1890,7 +1911,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                 * Only flag a stream inactive when it has already
                 * received data and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0
+               if (stream->index_received_seqcount > 0
                                && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end =
                                be64toh(index_info.timestamp_end);
@@ -1918,7 +1939,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               stream->total_index_received++;
+               tracefile_array_commit_seq(stream->tfa);
+               stream->index_received_seqcount++;
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
@@ -2066,8 +2088,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
        /* Get data offset because we are about to update the index. */
        data_offset = htobe64(stream->tracefile_size_current);
 
-       DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64,
-               stream->stream_handle, stream->tracefile_size_current);
+       DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
+                       stream->stream_handle, net_seq_num, stream->tracefile_size_current);
 
        /*
         * Lookup for an existing index for that stream id/sequence
@@ -2091,7 +2113,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
                fd = index_create_file(stream->path_name, stream->channel_name,
                                -1, -1, stream->tracefile_size,
-                               stream->current_tracefile_id);
+                               tracefile_array_get_file_index_head(stream->tfa));
                if (fd < 0) {
                        ret = -1;
                        /* Put self-ref for this index due to error. */
@@ -2120,7 +2142,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               stream->total_index_received++;
+               tracefile_array_commit_seq(stream->tfa);
+               stream->index_received_seqcount++;
        } else if (ret > 0) {
                /* No flush. */
                ret = 0;
@@ -2146,6 +2169,10 @@ static int relay_process_data(struct relay_connection *conn)
        uint64_t net_seq_num;
        uint32_t data_size;
        struct relay_session *session;
+       bool new_stream = false, close_requested = false;
+       size_t chunk_size = RECV_DATA_BUFFER_SIZE;
+       size_t recv_off = 0;
+       char data_buffer[chunk_size];
 
        ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
                        sizeof(struct lttcomm_relayd_data_hdr), 0);
@@ -2163,39 +2190,17 @@ static int relay_process_data(struct relay_connection *conn)
        stream_id = be64toh(data_hdr.stream_id);
        stream = stream_get_by_id(stream_id);
        if (!stream) {
+               ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id);
                ret = -1;
                goto end;
        }
        session = stream->trace->session;
        data_size = be32toh(data_hdr.data_size);
-       if (data_buffer_size < data_size) {
-               char *tmp_data_ptr;
-
-               tmp_data_ptr = realloc(data_buffer, data_size);
-               if (!tmp_data_ptr) {
-                       ERR("Allocating data buffer");
-                       free(data_buffer);
-                       ret = -1;
-                       goto end_stream_put;
-               }
-               data_buffer = tmp_data_ptr;
-               data_buffer_size = data_size;
-       }
-       memset(data_buffer, 0, data_size);
 
        net_seq_num = be64toh(data_hdr.net_seq_num);
 
        DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
                data_size, stream_id, net_seq_num);
-       ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
-       if (ret <= 0) {
-               if (ret == 0) {
-                       /* Orderly shutdown. Not necessary to print an error. */
-                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
-               }
-               ret = -1;
-               goto end_stream_put;
-       }
 
        pthread_mutex_lock(&stream->lock);
 
@@ -2203,35 +2208,23 @@ static int relay_process_data(struct relay_connection *conn)
        if (stream->tracefile_size > 0 &&
                        (stream->tracefile_size_current + data_size) >
                        stream->tracefile_size) {
-               uint64_t new_id;
+               uint64_t old_id, new_id;
+
+               old_id = tracefile_array_get_file_index_head(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa);
+
+               /* new_id is updated by utils_rotate_stream_file. */
+               new_id = old_id;
 
-               new_id = (stream->current_tracefile_id + 1) %
-                       stream->tracefile_count;
-               /*
-                * Move viewer oldest available data position forward if
-                * we are overwriting a tracefile.
-                */
-               if (new_id == stream->oldest_tracefile_id) {
-                       stream->oldest_tracefile_id =
-                               (stream->oldest_tracefile_id + 1) %
-                               stream->tracefile_count;
-               }
                ret = utils_rotate_stream_file(stream->path_name,
                                stream->channel_name, stream->tracefile_size,
                                stream->tracefile_count, -1,
                                -1, stream->stream_fd->fd,
-                               &stream->current_tracefile_id,
-                               &stream->stream_fd->fd);
+                               &new_id, &stream->stream_fd->fd);
                if (ret < 0) {
                        ERR("Rotating stream output file");
                        goto end_stream_unlock;
                }
-               stream->current_tracefile_seq++;
-               if (stream->current_tracefile_seq
-                       - stream->oldest_tracefile_seq >=
-                               stream->tracefile_count) {
-                       stream->oldest_tracefile_seq++;
-               }
                /*
                 * Reset current size because we just performed a stream
                 * rotation.
@@ -2247,33 +2240,67 @@ static int relay_process_data(struct relay_connection *conn)
        if (session->minor >= 4 && !session->snapshot) {
                ret = handle_index_data(stream, net_seq_num, rotate_index);
                if (ret < 0) {
+                       ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+                                       stream->stream_handle, net_seq_num, ret);
                        goto end_stream_unlock;
                }
        }
 
-       /* Write data to stream output fd. */
-       size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
-       if (size_ret < data_size) {
-               ERR("Relay error writing data to file");
-               ret = -1;
-               goto end_stream_unlock;
-       }
+       for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) {
+               size_t recv_size = min(data_size - recv_off, chunk_size);
+
+               ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0);
+               if (ret <= 0) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. Not necessary to print an error. */
+                               DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+                       } else {
+                               ERR("Socket %d error %d", conn->sock->fd, ret);
+                       }
+                       ret = -1;
+                       goto end_stream_unlock;
+               }
+
+               /* Write data to stream output fd. */
+               size_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+                               recv_size);
+               if (size_ret < recv_size) {
+                       ERR("Relay error writing data to file");
+                       ret = -1;
+                       goto end_stream_unlock;
+               }
 
-       DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
-                       size_ret, stream->stream_handle);
+               DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+                               size_ret, stream->stream_handle);
+       }
 
        ret = write_padding_to_file(stream->stream_fd->fd,
                        be32toh(data_hdr.padding_size));
        if (ret < 0) {
+               ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+                               stream->stream_handle, net_seq_num, ret);
                goto end_stream_unlock;
        }
        stream->tracefile_size_current +=
                        data_size + be32toh(data_hdr.padding_size);
+       if (stream->prev_seq == -1ULL) {
+               new_stream = true;
+       }
+
        stream->prev_seq = net_seq_num;
 
 end_stream_unlock:
+       close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
-end_stream_put:
+       if (close_requested) {
+               try_stream_close(stream);
+       }
+
+       if (new_stream) {
+               pthread_mutex_lock(&session->lock);
+               uatomic_set(&session->new_streams, 1);
+               pthread_mutex_unlock(&session->lock);
+       }
        stream_put(stream);
 end:
        return ret;
@@ -2410,10 +2437,7 @@ restart:
 
                        /* Inspect the relay conn pipe for new connection */
                        if (pollfd == relay_conn_pipe[0]) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       ERR("Relay connection pipe error");
-                                       goto error;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        struct relay_connection *conn;
 
                                        ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
@@ -2424,6 +2448,12 @@ restart:
                                                        LPOLLIN | LPOLLRDHUP);
                                        connection_ht_add(relay_connections_ht, conn);
                                        DBG("Connection socket %d added", conn->sock->fd);
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("Relay connection pipe error");
+                                       goto error;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        } else {
                                struct relay_connection *ctrl_conn;
@@ -2432,29 +2462,8 @@ restart:
                                /* If not found, there is a synchronization issue. */
                                assert(ctrl_conn);
 
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       relay_thread_close_connection(&events, pollfd, ctrl_conn);
-                                       if (last_seen_data_fd == pollfd) {
-                                               last_seen_data_fd = last_notdel_data_fd;
-                                       }
-                               } else if (revents & LPOLLIN) {
-                                       if (ctrl_conn->type == RELAY_CONTROL) {
-                                               ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, &recv_hdr,
-                                                               sizeof(recv_hdr), 0);
-                                               if (ret <= 0) {
-                                                       /* Connection closed */
-                                                       relay_thread_close_connection(&events, pollfd,
-                                                               ctrl_conn);
-                                               } else {
-                                                       ret = relay_process_control(&recv_hdr, ctrl_conn);
-                                                       if (ret < 0) {
-                                                               /* Clear the session on error. */
-                                                               relay_thread_close_connection(&events, pollfd,
-                                                                       ctrl_conn);
-                                                       }
-                                                       seen_control = 1;
-                                               }
-                                       } else {
+                               if (ctrl_conn->type == RELAY_DATA) {
+                                       if (revents & LPOLLIN) {
                                                /*
                                                 * Flag the last seen data fd not deleted. It will be
                                                 * used as the last seen fd if any fd gets deleted in
@@ -2462,9 +2471,39 @@ restart:
                                                 */
                                                last_notdel_data_fd = pollfd;
                                        }
+                                       goto put_ctrl_connection;
+                               }
+                               assert(ctrl_conn->type == RELAY_CONTROL);
+
+                               if (revents & LPOLLIN) {
+                                       ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock,
+                                                       &recv_hdr, sizeof(recv_hdr), 0);
+                                       if (ret <= 0) {
+                                               /* Connection closed */
+                                               relay_thread_close_connection(&events, pollfd,
+                                                               ctrl_conn);
+                                       } else {
+                                               ret = relay_process_control(&recv_hdr, ctrl_conn);
+                                               if (ret < 0) {
+                                                       /* Clear the session on error. */
+                                                       relay_thread_close_connection(&events,
+                                                                       pollfd, ctrl_conn);
+                                               }
+                                               seen_control = 1;
+                                       }
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       relay_thread_close_connection(&events,
+                                                       pollfd, ctrl_conn);
+                                       if (last_seen_data_fd == pollfd) {
+                                               last_seen_data_fd = last_notdel_data_fd;
+                                       }
                                } else {
-                                       ERR("Unknown poll events %u for sock %d", revents, pollfd);
+                                       ERR("Unexpected poll events %u for control sock %d",
+                                                       revents, pollfd);
+                                       connection_put(ctrl_conn);
+                                       goto error;
                                }
+                       put_ctrl_connection:
                                connection_put(ctrl_conn);
                        }
                }
@@ -2514,17 +2553,17 @@ restart:
                                /* Skip it. Might be removed before. */
                                continue;
                        }
+                       if (data_conn->type == RELAY_CONTROL) {
+                               goto put_data_connection;
+                       }
+                       assert(data_conn->type == RELAY_DATA);
 
                        if (revents & LPOLLIN) {
-                               if (data_conn->type != RELAY_DATA) {
-                                       goto put_connection;
-                               }
-
                                ret = relay_process_data(data_conn);
                                /* Connection closed */
                                if (ret < 0) {
                                        relay_thread_close_connection(&events, pollfd,
-                                               data_conn);
+                                                       data_conn);
                                        /*
                                         * Every goto restart call sets the last seen fd where
                                         * here we don't really care since we gracefully
@@ -2536,8 +2575,14 @@ restart:
                                        connection_put(data_conn);
                                        goto restart;
                                }
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               relay_thread_close_connection(&events, pollfd,
+                                               data_conn);
+                       } else {
+                               ERR("Unknown poll events %u for data sock %d",
+                                               revents, pollfd);
                        }
-               put_connection:
+               put_data_connection:
                        connection_put(data_conn);
                }
                last_seen_data_fd = -1;
@@ -2573,7 +2618,6 @@ relay_connections_ht_error:
                DBG("Thread exited with error");
        }
        DBG("Worker thread cleanup complete");
-       free(data_buffer);
 error_testpoint:
        if (err) {
                health_error();
This page took 0.036794 seconds and 4 git commands to generate.