From: Jérémie Galarneau Date: Fri, 15 Dec 2023 16:44:34 +0000 (-0500) Subject: Fix: relayd: live client not notified of inactive streams X-Git-Url: http://git.liburcu.org/?p=lttng-tools.git;a=commitdiff_plain;h=283a96e49f066a4263726271bc64aa7e94ae0e92 Fix: relayd: live client not notified of inactive streams Observed issue -------------- Some LTTng-tools live tests failures appear to show babeltrace2 hanging (failing to print expected events). The problem is intermittent, but Kienan was able to develop a test case that's reproducible for him. The test case performs the following steps: - Start a ust application and leave it running - Configure and then start an lttng live session - Connect a live viewer (babeltrace) - Run a second ust application - Wait for the expected number of events - In the failing case, no events are seen by babeltrace Using per-uid buffers, the test typically completes normally. With per-pid buffers the test fails, hanging indefinitely if waiting for the specified number of events. While "hanging", babeltrace2 is polling the relayd. This affects for babeltrace2 stable-2.0 and master while using lttng-tools master. For more information, see the description of bug #1406[1] Cause ----- When consuming a live trace captured in per-PID mode, Babeltrace periodically requests the index of the next packet it should consume. As part of the reply, it gets a 'flags' field which is used to announce that new streams, or new metadata, are available to the viewer. Unfortunately, these 'flags' are only set when the relay daemon has new tracing data to deliver. It is not set when the relay daemon indicates that the stream is inactive (see LTTNG_VIEWER_INDEX_INACTIVE). In the average case where an application is spawned while others are actively emiting events, a request for new data will result in a reply that returns an index entry (code LTTNG_VIEWER_INDEX_OK) for an available packet accompanied by the LTTNG_VIEWER_FLAG_NEW_STREAM flag. This flag indicates to the viewer that it should request new streams (using the LTTNG_VIEWER_GET_NEW_STREAMS live protocol command) before consuming the new data. In the cases where we observe a hang, an application is running but not emiting new events. As such, the relay daemon periodically emits "live beacons" to indicate that the session's streams are inactive up to a given time 'T'. Since the existing application remains inactive and the viewer is never notified that new streams are available, the viewer effectively remains "stuck" and never notices the new application being traced. The LTTNG_VIEWER_FLAG_NEW_METADATA communicates a similar semantic with regards to the metadata. However, ignoring it for inactive streams isn't as deleterious: the same information is made available to the viewer the next time it will successfully request a new index to the relay daemon. This would only become a problem if the tracers start to express non-layout data (like supplemental environment information, but I don't see a real use-case) as part of the metadata stream that should be made available downstream even during periods of inactivity. Note that the same problem most likely affects the per-UID buffer allocation mode when multiple users are being traced. Solution -------- On the producer end, LTTNG_VIEWER_FLAG_NEW_STREAM is set even when returning an inactivity index. Note that to preserve compatibility with older live consumers that don't expect this flag in non-OK response, the LTTNG_VIEWER_FLAG_NEW_STREAM notification is repeated until the next LTTNG_VIEWER_GET_NEW_STREAMS command that returns LTTNG_VIEWER_INDEX_OK. The 'new_streams' state is no longer cleared from relay sessions during the processing of the LTTNG_VIEWER_GET_NEXT_INDEX commands. Instead, it is cleared when the viewer requests new streams. On Babeltrace's end, the handler of the LTTNG_VIEWER_GET_NEXT_INDEX command (lttng_live_get_next_index) is modified to expect LTTNG_VIEWER_FLAG_NEW_STREAM in the cases where the command returns: - LTTNG_VIEWER_INDEX_OK (as done previously), - LTTNG_VIEWER_INDEX_HUP (new), - LTTNG_VIEWER_INDEX_INACTIVE (new). Drawbacks --------- This is arguably a protocol change as none of the producers ever set the NEW_METADATA/NEW_STREAM flags when indicating an inactive stream. References ---------- [1] https://bugs.lttng.org/issues/1406 Fixes #1406 Change-Id: I84f53f089597ac7b22ce8bd0962d4b28112b7ab6 Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index f1f97f917..1168b827a 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -238,7 +238,6 @@ static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) static int check_new_streams(struct relay_connection *conn) { struct relay_session *session; - unsigned long current_val; int ret = 0; if (!conn->viewer_session) { @@ -253,15 +252,17 @@ static int check_new_streams(struct relay_connection *conn) if (!session_get(session)) { continue; } - current_val = uatomic_cmpxchg(&session->new_streams, 1, 0); - ret = current_val; + + ret = uatomic_read(&session->new_streams); session_put(session); if (ret == 1) { goto end; } } } + end: + DBG("Viewer connection has%s new streams: socket_fd = %d", ret == 0 ? " no" : "", conn->sock->fd); return ret; } @@ -1218,6 +1219,8 @@ static int viewer_get_new_streams(struct relay_connection *conn) response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply_unlock; } + + uatomic_set(&session->new_streams, 0); send_streams = 1; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); @@ -1661,6 +1664,7 @@ static int viewer_get_next_index(struct relay_connection *conn) bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind; uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL; enum lttng_trace_chunk_status status; + bool attached_sessions_have_new_streams = false; LTTNG_ASSERT(conn); @@ -1712,6 +1716,17 @@ static int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } + ret = check_new_streams(conn); + if (ret < 0) { + viewer_index.status = LTTNG_VIEWER_INDEX_ERR; + ERR("Error checking for new streams in the attached sessions, returning status=%s", + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); + goto send_reply; + } else if (ret == 1) { + attached_sessions_have_new_streams = true; + } + if (rstream->ongoing_rotation.is_set) { /* Rotation is ongoing, try again later. */ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; @@ -1821,6 +1836,7 @@ static int viewer_get_next_index(struct relay_connection *conn) */ goto send_reply; } + /* At this point, ret is 0 thus we will be able to read the index. */ LTTNG_ASSERT(!ret); @@ -1899,19 +1915,6 @@ static int viewer_get_next_index(struct relay_connection *conn) vstream->stream_file.handle = fs_handle; } - ret = check_new_streams(conn); - if (ret < 0) { - viewer_index.status = LTTNG_VIEWER_INDEX_ERR; - ERR("Error checking for new streams before sending new index to stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); - goto send_reply; - } else if (ret == 1) { - viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; - } - ret = lttng_index_file_read(vstream->index_file, &packet_index); if (ret) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; @@ -1962,6 +1965,10 @@ send_reply: pthread_mutex_unlock(&metadata_viewer_stream->stream->lock); } + if (attached_sessions_have_new_streams) { + viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; + } + viewer_index.flags = htobe32(viewer_index.flags); viewer_index.status = htobe32(viewer_index.status); health_code_update(); diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index 1de1f1853..ea0ed2119 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -1531,6 +1531,7 @@ static void publish_connection_local_streams(struct relay_connection *conn) if (session->viewer_attached) { uatomic_set(&session->new_streams, 1); } + pthread_mutex_unlock(&session->lock); }