Fix: relayd: live client not notified of inactive streams
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 2667fa6ff212bd3e34b3735209573b11561edfe5..c266465fb3881da890a591cde1e014702f70115e 100644 (file)
@@ -88,6 +88,27 @@ static uint64_t last_relay_viewer_session_id;
 static pthread_mutex_t last_relay_viewer_session_id_lock =
                PTHREAD_MUTEX_INITIALIZER;
 
+static const char *
+lttng_viewer_next_index_return_code_str(enum lttng_viewer_next_index_return_code code)
+{
+       switch (code) {
+       case LTTNG_VIEWER_INDEX_OK:
+               return "INDEX_OK";
+       case LTTNG_VIEWER_INDEX_RETRY:
+               return "INDEX_RETRY";
+       case LTTNG_VIEWER_INDEX_HUP:
+               return "INDEX_HUP";
+       case LTTNG_VIEWER_INDEX_ERR:
+               return "INDEX_ERR";
+       case LTTNG_VIEWER_INDEX_INACTIVE:
+               return "INDEX_INACTIVE";
+       case LTTNG_VIEWER_INDEX_EOF:
+               return "INDEX_EOF";
+       default:
+               abort();
+       }
+}
+
 /*
  * Cleanup the daemon
  */
@@ -156,28 +177,30 @@ static
 int check_new_streams(struct relay_connection *conn)
 {
        struct relay_session *session;
-       unsigned long current_val;
        int ret = 0;
 
+       rcu_read_lock();
        if (!conn->viewer_session) {
                goto end;
        }
-       rcu_read_lock();
-       cds_list_for_each_entry_rcu(session,
-                       &conn->viewer_session->session_list,
-                       viewer_session_node) {
+
+       cds_list_for_each_entry_rcu(
+               session, &conn->viewer_session->session_list, viewer_session_node)
+       {
                if (!session_get(session)) {
                        continue;
                }
-               current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
-               ret = current_val;
+
+               ret = uatomic_read(&session->new_streams);
                session_put(session);
                if (ret == 1) {
                        goto end;
                }
        }
+
 end:
        rcu_read_unlock();
+       DBG("Viewer connection has%s new streams: socket_fd = %d", ret == 0 ? " no" : "", conn->sock->fd);
        return ret;
 }
 
@@ -374,7 +397,7 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                 * chunk can be used safely.
                                 */
                                if ((relay_stream->ongoing_rotation.is_set ||
-                                                   relay_session->ongoing_rotation) &&
+                                               session_has_ongoing_rotation(relay_session)) &&
                                                relay_stream->trace_chunk) {
                                        viewer_stream_trace_chunk = lttng_trace_chunk_copy(
                                                        relay_stream->trace_chunk);
@@ -1165,13 +1188,30 @@ int viewer_get_new_streams(struct relay_connection *conn)
         * the viewer's point of view.
         */
        pthread_mutex_lock(&session->lock);
+       /*
+        * If a session rotation is ongoing, do not attempt to open any
+        * stream, because the chunk can be in an intermediate state
+        * due to directory renaming.
+        */
+       if (session_has_ongoing_rotation(session)) {
+               DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
+               goto send_reply_unlock;
+       }
        ret = make_viewer_streams(session,
                        conn->viewer_session,
                        LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
                        &nb_created, &closed);
        if (ret < 0) {
-               goto error_unlock_session;
+               /*
+                * This is caused by an internal error; propagate the negative
+                * 'ret' to close the connection.
+                */
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
+               goto send_reply_unlock;
        }
+
+       uatomic_set(&session->new_streams, 0);
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
@@ -1225,10 +1265,6 @@ end_put_session:
        }
 error:
        return ret;
-error_unlock_session:
-       pthread_mutex_unlock(&session->lock);
-       session_put(session);
-       return ret;
 }
 
 /*
@@ -1305,6 +1341,17 @@ int viewer_attach_session(struct relay_connection *conn)
                goto send_reply;
        }
 
+       /*
+        * If a session rotation is ongoing, do not attempt to open any
+        * stream, because the chunk can be in an intermediate state
+        * due to directory renaming.
+        */
+       if (session_has_ongoing_rotation(session)) {
+               DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
+               send_streams = 0;
+               goto send_reply;
+       }
+
        ret = make_viewer_streams(session,
                        conn->viewer_session, seek_type,
                        &nb_streams, NULL, NULL, &closed);
@@ -1579,6 +1626,7 @@ int viewer_get_next_index(struct relay_connection *conn)
        struct relay_stream *rstream = NULL;
        struct ctf_trace *ctf_trace = NULL;
        struct relay_viewer_stream *metadata_viewer_stream = NULL;
+       bool attached_sessions_have_new_streams = false;
 
        assert(conn);
 
@@ -1609,6 +1657,13 @@ int viewer_get_next_index(struct relay_connection *conn)
        metadata_viewer_stream =
                        ctf_trace_get_viewer_metadata_stream(ctf_trace);
 
+       /*
+        * Hold the session lock to protect against concurrent changes
+        * to the chunk files (e.g. rename done by clear), which are
+        * protected by the session ongoing rotation state. Those are
+        * synchronized with the session lock.
+        */
+       pthread_mutex_lock(&rstream->trace->session->lock);
        pthread_mutex_lock(&rstream->lock);
 
        /*
@@ -1619,13 +1674,24 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+       ret = check_new_streams(conn);
+       if (ret < 0) {
+               viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+               ERR("Error checking for new streams in the attached sessions, returning status=%s",
+                   lttng_viewer_next_index_return_code_str(
+                           (enum lttng_viewer_next_index_return_code) viewer_index.status));
+               goto send_reply;
+       } else if (ret == 1) {
+               attached_sessions_have_new_streams = true;
+       }
+
        if (rstream->ongoing_rotation.is_set) {
                /* Rotation is ongoing, try again later. */
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                goto send_reply;
        }
 
-       if (rstream->trace->session->ongoing_rotation) {
+       if (session_has_ongoing_rotation(rstream->trace->session)) {
                /* Rotation is ongoing, try again later. */
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                goto send_reply;
@@ -1659,9 +1725,9 @@ int viewer_get_next_index(struct relay_connection *conn)
         * This allows clients to consume all the packets of a trace chunk
         * after a session's destruction.
         */
-       if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk &&
+       if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk) &&
                        !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) {
-               DBG("Viewer session and viewer stream chunk differ: "
+               DBG("Viewer session and viewer stream chunk IDs differ: "
                                "vsession chunk %p vstream chunk %p",
                                conn->viewer_session->current_trace_chunk,
                                vstream->stream_file.trace_chunk);
@@ -1681,6 +1747,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                 */
                goto send_reply;
        }
+
        /* At this point, ret is 0 thus we will be able to read the index. */
        assert(!ret);
 
@@ -1740,14 +1807,6 @@ int viewer_get_next_index(struct relay_connection *conn)
                vstream->stream_file.handle = fs_handle;
        }
 
-       ret = check_new_streams(conn);
-       if (ret < 0) {
-               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
-               goto send_reply;
-       } else if (ret == 1) {
-               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
-       }
-
        ret = lttng_index_file_read(vstream->index_file, &packet_index);
        if (ret) {
                ERR("Relay error reading index file");
@@ -1775,6 +1834,7 @@ int viewer_get_next_index(struct relay_connection *conn)
 send_reply:
        if (rstream) {
                pthread_mutex_unlock(&rstream->lock);
+               pthread_mutex_unlock(&rstream->trace->session->lock);
        }
 
        if (metadata_viewer_stream) {
@@ -1791,6 +1851,10 @@ send_reply:
                pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
        }
 
+       if (attached_sessions_have_new_streams) {
+               viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+       }
+
        viewer_index.flags = htobe32(viewer_index.flags);
        health_code_update();
 
@@ -1816,6 +1880,7 @@ end:
 
 error_put:
        pthread_mutex_unlock(&rstream->lock);
+       pthread_mutex_unlock(&rstream->trace->session->lock);
        if (metadata_viewer_stream) {
                viewer_stream_put(metadata_viewer_stream);
        }
@@ -1951,6 +2016,7 @@ int viewer_get_metadata(struct relay_connection *conn)
        struct lttng_viewer_get_metadata request;
        struct lttng_viewer_metadata_packet reply;
        struct relay_viewer_stream *vstream = NULL;
+       bool dispose_of_stream = false;
 
        assert(conn);
 
@@ -1981,6 +2047,9 @@ int viewer_get_metadata(struct relay_connection *conn)
                reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
+
+       pthread_mutex_lock(&vstream->stream->trace->session->lock);
+       pthread_mutex_lock(&vstream->stream->trace->lock);
        pthread_mutex_lock(&vstream->stream->lock);
        if (!vstream->stream->is_metadata) {
                ERR("Invalid metadata stream");
@@ -1989,11 +2058,7 @@ int viewer_get_metadata(struct relay_connection *conn)
 
        if (vstream->metadata_sent >= vstream->stream->metadata_received) {
                /*
-                * The live viewers expect to receive a NO_NEW_METADATA
-                * status before a stream disappears, otherwise they abort the
-                * entire live connection when receiving an error status.
-                *
-                * Clear feature resets the metadata_sent to 0 until the
+                * Clear feature resets the metadata_received to 0 until the
                 * same metadata is received again.
                 */
                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
@@ -2001,13 +2066,7 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * The live viewer considers a closed 0 byte metadata stream as
                 * an error.
                 */
-               if (vstream->metadata_sent > 0) {
-                       vstream->stream->no_new_metadata_notified = true;
-                       if (vstream->stream->closed) {
-                               /* Release ownership for the viewer metadata stream. */
-                               viewer_stream_put(vstream);
-                       }
-               }
+               dispose_of_stream = vstream->metadata_sent > 0 && vstream->stream->closed;
                goto send_reply;
        }
 
@@ -2028,8 +2087,8 @@ int viewer_get_metadata(struct relay_connection *conn)
        }
 
        if (conn->viewer_session->current_trace_chunk &&
-                       conn->viewer_session->current_trace_chunk !=
-                                       vstream->stream_file.trace_chunk) {
+                       !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
+                                       vstream->stream_file.trace_chunk)) {
                bool acquired_reference;
 
                DBG("Viewer session and viewer stream chunk differ: "
@@ -2047,6 +2106,19 @@ int viewer_get_metadata(struct relay_connection *conn)
        len = vstream->stream->metadata_received - vstream->metadata_sent;
 
        if (!vstream->stream_file.trace_chunk) {
+               if (vstream->stream->trace->session->connection_closed) {
+                       /*
+                        * If the connection is closed, there is no way for the metadata stream
+                        * to ever transition back to an active chunk. As such, signal to the viewer
+                        * that there is no new metadata available.
+                        *
+                        * The stream can be disposed-of. On the next execution of this command,
+                        * the relay daemon will reply with an error status since the stream can't
+                        * be found.
+                        */
+                       dispose_of_stream = true;
+               }
+
                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
                len = 0;
                goto send_reply;
@@ -2176,6 +2248,8 @@ send_reply:
        health_code_update();
        if (vstream) {
                pthread_mutex_unlock(&vstream->stream->lock);
+               pthread_mutex_unlock(&vstream->stream->trace->lock);
+               pthread_mutex_unlock(&vstream->stream->trace->session->lock);
        }
        ret = send_response(conn->sock, &reply, sizeof(reply));
        if (ret < 0) {
@@ -2200,7 +2274,22 @@ end_free:
 end:
        if (vstream) {
                viewer_stream_put(vstream);
+               if (dispose_of_stream) {
+                       /*
+                        * Trigger the destruction of the viewer stream
+                        * by releasing its global reference.
+                        *
+                        * The live viewers expect to receive a NO_NEW_METADATA
+                        * status before a stream disappears, otherwise they abort the
+                        * entire live connection when receiving an error status.
+                        *
+                        * On the next query for this stream, an error will be reported to the
+                        * client.
+                        */
+                       viewer_stream_put(vstream);
+               }
        }
+
        return ret;
 }
 
This page took 0.028805 seconds and 4 git commands to generate.