X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=11e2f799dda14b2e364fffcf701a42a519a4f920;hb=59325698864596d9c424fc3eb57affb23ec4380b;hp=2667fa6ff212bd3e34b3735209573b11561edfe5;hpb=22afdf060582231ffcaae1758a9c3dd7c9679631;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 2667fa6ff..11e2f799d 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -1165,6 +1165,16 @@ int viewer_get_new_streams(struct relay_connection *conn) * the viewer's point of view. */ pthread_mutex_lock(&session->lock); + /* + * If a session rotation is ongoing, do not attempt to open any + * stream, because the chunk can be in an intermediate state + * due to directory renaming. + */ + if (session->ongoing_rotation) { + DBG("Relay session %" PRIu64 " rotation ongoing", session_id); + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW); + goto send_reply_unlock; + } ret = make_viewer_streams(session, conn->viewer_session, LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent, @@ -1305,6 +1315,17 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } + /* + * If a session rotation is ongoing, do not attempt to open any + * stream, because the chunk can be in an intermediate state + * due to directory renaming. + */ + if (session->ongoing_rotation) { + DBG("Relay session %" PRIu64 " rotation ongoing", session_id); + send_streams = 0; + goto send_reply; + } + ret = make_viewer_streams(session, conn->viewer_session, seek_type, &nb_streams, NULL, NULL, &closed); @@ -1609,6 +1630,13 @@ int viewer_get_next_index(struct relay_connection *conn) metadata_viewer_stream = ctf_trace_get_viewer_metadata_stream(ctf_trace); + /* + * Hold the session lock to protect against concurrent changes + * to the chunk files (e.g. rename done by clear), which are + * protected by the session ongoing rotation state. Those are + * synchronized with the session lock. + */ + pthread_mutex_lock(&rstream->trace->session->lock); pthread_mutex_lock(&rstream->lock); /* @@ -1659,9 +1687,9 @@ int viewer_get_next_index(struct relay_connection *conn) * This allows clients to consume all the packets of a trace chunk * after a session's destruction. */ - if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk && + if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk) && !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) { - DBG("Viewer session and viewer stream chunk differ: " + DBG("Viewer session and viewer stream chunk IDs differ: " "vsession chunk %p vstream chunk %p", conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk); @@ -1775,6 +1803,7 @@ int viewer_get_next_index(struct relay_connection *conn) send_reply: if (rstream) { pthread_mutex_unlock(&rstream->lock); + pthread_mutex_unlock(&rstream->trace->session->lock); } if (metadata_viewer_stream) { @@ -1816,6 +1845,7 @@ end: error_put: pthread_mutex_unlock(&rstream->lock); + pthread_mutex_unlock(&rstream->trace->session->lock); if (metadata_viewer_stream) { viewer_stream_put(metadata_viewer_stream); } @@ -2002,11 +2032,11 @@ int viewer_get_metadata(struct relay_connection *conn) * an error. */ if (vstream->metadata_sent > 0) { - vstream->stream->no_new_metadata_notified = true; - if (vstream->stream->closed) { + if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) { /* Release ownership for the viewer metadata stream. */ viewer_stream_put(vstream); } + vstream->stream->no_new_metadata_notified = true; } goto send_reply; } @@ -2028,8 +2058,8 @@ int viewer_get_metadata(struct relay_connection *conn) } if (conn->viewer_session->current_trace_chunk && - conn->viewer_session->current_trace_chunk != - vstream->stream_file.trace_chunk) { + !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk)) { bool acquired_reference; DBG("Viewer session and viewer stream chunk differ: "