Fix live-comm: merge TCP socket write-write sequence in a single write
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Mon, 24 Jul 2017 20:07:00 +0000 (16:07 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 27 Jul 2017 20:19:21 +0000 (16:19 -0400)
The live protocol implementation is often sending content
on TCP sockets in two separate writes. One to send a command header,
and the second one sending the command's payload. This was presumably
done under the assumption that it would not result in two separate
TCP packets being sent on the network (or that it would not matter).

Delayed ACK-induced delays were observed [1] on the second write of the
"write header, write payload" sequence and result in problematic
latency build-ups for live clients connected to moderately/highly
active sessions.

Fundamentaly, this problem arises due to the combination of Nagle's
algorithm and the delayed ACK mechanism which make write-write-read
sequences on TCP sockets problematic as near-constant latency is
expected when clients can keep-up with the event production rate.

In such a write-write-read sequence, the second write is held up until
the first write is acknowledged (TCP ACK). The solution implemented
by this patch bundles the writes into a single one [2].

[1] https://github.com/tbricks/wireshark-lttng-plugin
    Basic Wireshark dissector for lttng-live by Anto Smyk from Itiviti
[2] https://lists.freebsd.org/pipermail/freebsd-net/2006-January/009527.html

Reported-by: Anton Smyk <anton.smyk@itiviti.com>
Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/live.c

index ffec5977e54984d99e72ae83fb2c79f3029481a7..c9a4575eb4e9d2f328a81164c6d2f9d0f1722d07 100644 (file)
@@ -1479,13 +1479,15 @@ error_put:
 static
 int viewer_get_packet(struct relay_connection *conn)
 {
-       int ret, send_data = 0;
+       int ret;
        char *data = NULL;
-       uint32_t len = 0;
-       ssize_t read_len;
        struct lttng_viewer_get_packet get_packet_info;
        struct lttng_viewer_trace_packet reply;
        struct relay_viewer_stream *vstream = NULL;
+       bool skip_send_data = false;
+       uint32_t send_len = sizeof(reply);
+       uint32_t packet_data_len = 0;
+       ssize_t read_len;
 
        DBG2("Relay get data packet");
 
@@ -1503,21 +1505,26 @@ int viewer_get_packet(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
        if (!vstream) {
+               skip_send_data = true;
                DBG("Client requested packet of unknown stream id %" PRIu64,
                                be64toh(get_packet_info.stream_id));
                reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
-               goto send_reply_nolock;
+       } else {
+               packet_data_len = be32toh(get_packet_info.len);
+               send_len += packet_data_len;
        }
 
-       pthread_mutex_lock(&vstream->stream->lock);
-
-       len = be32toh(get_packet_info.len);
-       data = zmalloc(len);
+       data = zmalloc(send_len);
        if (!data) {
                PERROR("relay data zmalloc");
                goto error;
        }
 
+       if (skip_send_data) {
+               goto send_reply_nolock;
+       }
+
+       pthread_mutex_lock(&vstream->stream->lock);
        ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
                        SEEK_SET);
        if (ret < 0) {
@@ -1525,16 +1532,17 @@ int viewer_get_packet(struct relay_connection *conn)
                        be64toh(get_packet_info.offset));
                goto error;
        }
-       read_len = lttng_read(vstream->stream_fd->fd, data, len);
-       if (read_len < len) {
+       read_len = lttng_read(vstream->stream_fd->fd,
+                       data + sizeof(reply),
+                       packet_data_len);
+       if (read_len < packet_data_len) {
                PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
                                vstream->stream_fd->fd,
                                be64toh(get_packet_info.offset));
                goto error;
        }
        reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
-       reply.len = htobe32(len);
-       send_data = 1;
+       reply.len = htobe32(packet_data_len);
        goto send_reply;
 
 error:
@@ -1545,26 +1553,19 @@ send_reply:
                pthread_mutex_unlock(&vstream->stream->lock);
        }
 send_reply_nolock:
-       reply.flags = htobe32(reply.flags);
 
        health_code_update();
 
-       ret = send_response(conn->sock, &reply, sizeof(reply));
+       memcpy(data, &reply, sizeof(reply));
+       health_code_update();
+       ret = send_response(conn->sock, data, send_len);
+       health_code_update();
        if (ret < 0) {
+               PERROR("sendmsg of packet data failed");
                goto end_free;
        }
-       health_code_update();
-
-       if (send_data) {
-               health_code_update();
-               ret = send_response(conn->sock, data, len);
-               if (ret < 0) {
-                       goto end_free;
-               }
-               health_code_update();
-       }
 
-       DBG("Sent %u bytes for stream %" PRIu64, len,
+       DBG("Sent %u bytes for stream %" PRIu64, send_len,
                        be64toh(get_packet_info.stream_id));
 
 end_free:
This page took 0.028446 seconds and 4 git commands to generate.