Fix: relayd: live client not notified of inactive streams
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 15 Dec 2023 16:44:34 +0000 (11:44 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 12 Mar 2024 17:24:09 +0000 (13:24 -0400)
Observed issue
--------------

Some LTTng-tools live tests failures appear to show babeltrace2
hanging (failing to print expected events). The problem is intermittent,
but Kienan was able to develop a test case that's reproducible for him.

The test case performs the following steps:
  - Start a ust application and leave it running
  - Configure and then start an lttng live session
  - Connect a live viewer (babeltrace)
  - Run a second ust application
  - Wait for the expected number of events
    - In the failing case, no events are seen by babeltrace

Using per-uid buffers, the test typically completes normally. With
per-pid buffers the test fails, hanging indefinitely if waiting for the
specified number of events. While "hanging", babeltrace2 is polling the
relayd.

This affects for babeltrace2 stable-2.0 and master while using
lttng-tools master.

For more information, see the description of bug #1406[1]

Cause
-----

When consuming a live trace captured in per-PID mode, Babeltrace
periodically requests the index of the next packet it should consume.

As part of the reply, it gets a 'flags' field which is used to announce
that new streams, or new metadata, are available to the viewer.
Unfortunately, these 'flags' are only set when the relay daemon has new
tracing data to deliver. It is not set when the relay daemon indicates
that the stream is inactive (see LTTNG_VIEWER_INDEX_INACTIVE).

In the average case where an application is spawned while others are
actively emiting events, a request for new data will result in a reply
that returns an index entry (code LTTNG_VIEWER_INDEX_OK) for an
available packet accompanied by the LTTNG_VIEWER_FLAG_NEW_STREAM flag.

This flag indicates to the viewer that it should request new
streams (using the LTTNG_VIEWER_GET_NEW_STREAMS live protocol command)
before consuming the new data.

In the cases where we observe a hang, an application is running but not
emiting new events. As such, the relay daemon periodically emits "live
beacons" to indicate that the session's streams are inactive up to a
given time 'T'.

Since the existing application remains inactive and the viewer is never
notified that new streams are available, the viewer effectively remains
"stuck" and never notices the new application being traced.

The LTTNG_VIEWER_FLAG_NEW_METADATA communicates a similar semantic with
regards to the metadata. However, ignoring it for inactive streams isn't
as deleterious: the same information is made available to the viewer the
next time it will successfully request a new index to the relay daemon.

This would only become a problem if the tracers start to express
non-layout data (like supplemental environment information, but I don't
see a real use-case) as part of the metadata stream that should be made
available downstream even during periods of inactivity.

Note that the same problem most likely affects the per-UID buffer
allocation mode when multiple users are being traced.

Solution
--------

On the producer end, LTTNG_VIEWER_FLAG_NEW_STREAM is set even when
returning an inactivity index.

Note that to preserve compatibility with older live consumers that don't
expect this flag in non-OK response, the LTTNG_VIEWER_FLAG_NEW_STREAM
notification is repeated until the next LTTNG_VIEWER_GET_NEW_STREAMS
command that returns LTTNG_VIEWER_INDEX_OK.

The 'new_streams' state is no longer cleared from relay sessions during
the processing of the LTTNG_VIEWER_GET_NEXT_INDEX commands. Instead, it
is cleared when the viewer requests new streams.

On Babeltrace's end, the handler of the LTTNG_VIEWER_GET_NEXT_INDEX
command (lttng_live_get_next_index) is modified to expect
LTTNG_VIEWER_FLAG_NEW_STREAM in the cases where the command returns:
  - LTTNG_VIEWER_INDEX_OK (as done previously),
  - LTTNG_VIEWER_INDEX_HUP (new),
  - LTTNG_VIEWER_INDEX_INACTIVE (new).

Drawbacks
---------

This is arguably a protocol change as none of the producers ever set the
NEW_METADATA/NEW_STREAM flags when indicating an inactive stream.

References
----------

[1] https://bugs.lttng.org/issues/1406

Fixes #1406

Change-Id: I84f53f089597ac7b22ce8bd0962d4b28112b7ab6
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/live.cpp
src/bin/lttng-relayd/main.cpp

index f1f97f91748b9337c43636d2949cf77b187ed336..1168b827a6d13122d06683fbdeae1b278d5b9c23 100644 (file)
@@ -238,7 +238,6 @@ static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
 static int check_new_streams(struct relay_connection *conn)
 {
        struct relay_session *session;
-       unsigned long current_val;
        int ret = 0;
 
        if (!conn->viewer_session) {
@@ -253,15 +252,17 @@ static int check_new_streams(struct relay_connection *conn)
                        if (!session_get(session)) {
                                continue;
                        }
-                       current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
-                       ret = current_val;
+
+                       ret = uatomic_read(&session->new_streams);
                        session_put(session);
                        if (ret == 1) {
                                goto end;
                        }
                }
        }
+
 end:
+       DBG("Viewer connection has%s new streams: socket_fd = %d", ret == 0 ? " no" : "", conn->sock->fd);
        return ret;
 }
 
@@ -1218,6 +1219,8 @@ static int viewer_get_new_streams(struct relay_connection *conn)
                response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply_unlock;
        }
+
+       uatomic_set(&session->new_streams, 0);
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
@@ -1661,6 +1664,7 @@ static int viewer_get_next_index(struct relay_connection *conn)
        bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
        uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
        enum lttng_trace_chunk_status status;
+       bool attached_sessions_have_new_streams = false;
 
        LTTNG_ASSERT(conn);
 
@@ -1712,6 +1716,17 @@ static int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+       ret = check_new_streams(conn);
+       if (ret < 0) {
+               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+               ERR("Error checking for new streams in the attached sessions, returning status=%s",
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               goto send_reply;
+       } else if (ret == 1) {
+               attached_sessions_have_new_streams = true;
+       }
+
        if (rstream->ongoing_rotation.is_set) {
                /* Rotation is ongoing, try again later. */
                viewer_index.status = LTTNG_VIEWER_INDEX_RETRY;
@@ -1821,6 +1836,7 @@ static int viewer_get_next_index(struct relay_connection *conn)
                 */
                goto send_reply;
        }
+
        /* At this point, ret is 0 thus we will be able to read the index. */
        LTTNG_ASSERT(!ret);
 
@@ -1899,19 +1915,6 @@ static int viewer_get_next_index(struct relay_connection *conn)
                vstream->stream_file.handle = fs_handle;
        }
 
-       ret = check_new_streams(conn);
-       if (ret < 0) {
-               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
-               ERR("Error checking for new streams before sending new index to stream id %" PRIu64
-                   ", returning status=%s",
-                   (uint64_t) be64toh(request_index.stream_id),
-                   lttng_viewer_next_index_return_code_str(
-                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
-               goto send_reply;
-       } else if (ret == 1) {
-               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
-       }
-
        ret = lttng_index_file_read(vstream->index_file, &packet_index);
        if (ret) {
                viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
@@ -1962,6 +1965,10 @@ send_reply:
                pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
        }
 
+       if (attached_sessions_have_new_streams) {
+               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+       }
+
        viewer_index.flags = htobe32(viewer_index.flags);
        viewer_index.status = htobe32(viewer_index.status);
        health_code_update();
index 1de1f1853009e10cac954e690edd2aeaa2b25c4f..ea0ed2119b5237684b35695038658fa2ed4ea5b6 100644 (file)
@@ -1531,6 +1531,7 @@ static void publish_connection_local_streams(struct relay_connection *conn)
        if (session->viewer_attached) {
                uatomic_set(&session->new_streams, 1);
        }
+
        pthread_mutex_unlock(&session->lock);
 }
 
This page took 0.031062 seconds and 4 git commands to generate.