static pthread_mutex_t last_relay_viewer_session_id_lock =
PTHREAD_MUTEX_INITIALIZER;
+static const char *
+lttng_viewer_next_index_return_code_str(enum lttng_viewer_next_index_return_code code)
+{
+ switch (code) {
+ case LTTNG_VIEWER_INDEX_OK:
+ return "INDEX_OK";
+ case LTTNG_VIEWER_INDEX_RETRY:
+ return "INDEX_RETRY";
+ case LTTNG_VIEWER_INDEX_HUP:
+ return "INDEX_HUP";
+ case LTTNG_VIEWER_INDEX_ERR:
+ return "INDEX_ERR";
+ case LTTNG_VIEWER_INDEX_INACTIVE:
+ return "INDEX_INACTIVE";
+ case LTTNG_VIEWER_INDEX_EOF:
+ return "INDEX_EOF";
+ default:
+ abort();
+ }
+}
+
/*
* Cleanup the daemon
*/
int check_new_streams(struct relay_connection *conn)
{
struct relay_session *session;
- unsigned long current_val;
int ret = 0;
+ rcu_read_lock();
if (!conn->viewer_session) {
goto end;
}
- rcu_read_lock();
- cds_list_for_each_entry_rcu(session,
- &conn->viewer_session->session_list,
- viewer_session_node) {
+
+ cds_list_for_each_entry_rcu(
+ session, &conn->viewer_session->session_list, viewer_session_node)
+ {
if (!session_get(session)) {
continue;
}
- current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
- ret = current_val;
+
+ ret = uatomic_read(&session->new_streams);
session_put(session);
if (ret == 1) {
goto end;
}
}
+
end:
rcu_read_unlock();
+ DBG("Viewer connection has%s new streams: socket_fd = %d", ret == 0 ? " no" : "", conn->sock->fd);
return ret;
}
* chunk can be used safely.
*/
if ((relay_stream->ongoing_rotation.is_set ||
- relay_session->ongoing_rotation) &&
+ session_has_ongoing_rotation(relay_session)) &&
relay_stream->trace_chunk) {
viewer_stream_trace_chunk = lttng_trace_chunk_copy(
relay_stream->trace_chunk);
* the viewer's point of view.
*/
pthread_mutex_lock(&session->lock);
+ /*
+ * If a session rotation is ongoing, do not attempt to open any
+ * stream, because the chunk can be in an intermediate state
+ * due to directory renaming.
+ */
+ if (session_has_ongoing_rotation(session)) {
+ DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
+ goto send_reply_unlock;
+ }
ret = make_viewer_streams(session,
conn->viewer_session,
LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
&nb_created, &closed);
if (ret < 0) {
- goto error_unlock_session;
+ /*
+ * This is caused by an internal error; propagate the negative
+ * 'ret' to close the connection.
+ */
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
+ goto send_reply_unlock;
}
+
+ uatomic_set(&session->new_streams, 0);
send_streams = 1;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
}
error:
return ret;
-error_unlock_session:
- pthread_mutex_unlock(&session->lock);
- session_put(session);
- return ret;
}
/*
goto send_reply;
}
+ /*
+ * If a session rotation is ongoing, do not attempt to open any
+ * stream, because the chunk can be in an intermediate state
+ * due to directory renaming.
+ */
+ if (session_has_ongoing_rotation(session)) {
+ DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
+ send_streams = 0;
+ goto send_reply;
+ }
+
ret = make_viewer_streams(session,
conn->viewer_session, seek_type,
&nb_streams, NULL, NULL, &closed);
struct relay_stream *rstream = NULL;
struct ctf_trace *ctf_trace = NULL;
struct relay_viewer_stream *metadata_viewer_stream = NULL;
+ bool attached_sessions_have_new_streams = false;
assert(conn);
metadata_viewer_stream =
ctf_trace_get_viewer_metadata_stream(ctf_trace);
+ /*
+ * Hold the session lock to protect against concurrent changes
+ * to the chunk files (e.g. rename done by clear), which are
+ * protected by the session ongoing rotation state. Those are
+ * synchronized with the session lock.
+ */
+ pthread_mutex_lock(&rstream->trace->session->lock);
pthread_mutex_lock(&rstream->lock);
/*
goto send_reply;
}
+ ret = check_new_streams(conn);
+ if (ret < 0) {
+ viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+ ERR("Error checking for new streams in the attached sessions, returning status=%s",
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
+ goto send_reply;
+ } else if (ret == 1) {
+ attached_sessions_have_new_streams = true;
+ }
+
if (rstream->ongoing_rotation.is_set) {
/* Rotation is ongoing, try again later. */
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
goto send_reply;
}
- if (rstream->trace->session->ongoing_rotation) {
+ if (session_has_ongoing_rotation(rstream->trace->session)) {
/* Rotation is ongoing, try again later. */
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
goto send_reply;
* This allows clients to consume all the packets of a trace chunk
* after a session's destruction.
*/
- if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk &&
+ if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk) &&
!(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) {
- DBG("Viewer session and viewer stream chunk differ: "
+ DBG("Viewer session and viewer stream chunk IDs differ: "
"vsession chunk %p vstream chunk %p",
conn->viewer_session->current_trace_chunk,
vstream->stream_file.trace_chunk);
*/
goto send_reply;
}
+
/* At this point, ret is 0 thus we will be able to read the index. */
assert(!ret);
vstream->stream_file.handle = fs_handle;
}
- ret = check_new_streams(conn);
- if (ret < 0) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
- goto send_reply;
- } else if (ret == 1) {
- viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
- }
-
ret = lttng_index_file_read(vstream->index_file, &packet_index);
if (ret) {
ERR("Relay error reading index file");
send_reply:
if (rstream) {
pthread_mutex_unlock(&rstream->lock);
+ pthread_mutex_unlock(&rstream->trace->session->lock);
}
if (metadata_viewer_stream) {
pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
}
+ if (attached_sessions_have_new_streams) {
+ viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+ }
+
viewer_index.flags = htobe32(viewer_index.flags);
health_code_update();
error_put:
pthread_mutex_unlock(&rstream->lock);
+ pthread_mutex_unlock(&rstream->trace->session->lock);
if (metadata_viewer_stream) {
viewer_stream_put(metadata_viewer_stream);
}
struct lttng_viewer_get_metadata request;
struct lttng_viewer_metadata_packet reply;
struct relay_viewer_stream *vstream = NULL;
+ bool dispose_of_stream = false;
assert(conn);
reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
goto send_reply;
}
+
+ pthread_mutex_lock(&vstream->stream->trace->session->lock);
+ pthread_mutex_lock(&vstream->stream->trace->lock);
pthread_mutex_lock(&vstream->stream->lock);
if (!vstream->stream->is_metadata) {
ERR("Invalid metadata stream");
if (vstream->metadata_sent >= vstream->stream->metadata_received) {
/*
- * The live viewers expect to receive a NO_NEW_METADATA
- * status before a stream disappears, otherwise they abort the
- * entire live connection when receiving an error status.
- *
- * Clear feature resets the metadata_sent to 0 until the
+ * Clear feature resets the metadata_received to 0 until the
* same metadata is received again.
*/
reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
* The live viewer considers a closed 0 byte metadata stream as
* an error.
*/
- if (vstream->metadata_sent > 0) {
- vstream->stream->no_new_metadata_notified = true;
- if (vstream->stream->closed) {
- /* Release ownership for the viewer metadata stream. */
- viewer_stream_put(vstream);
- }
- }
+ dispose_of_stream = vstream->metadata_sent > 0 && vstream->stream->closed;
goto send_reply;
}
}
if (conn->viewer_session->current_trace_chunk &&
- conn->viewer_session->current_trace_chunk !=
- vstream->stream_file.trace_chunk) {
+ !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
+ vstream->stream_file.trace_chunk)) {
bool acquired_reference;
DBG("Viewer session and viewer stream chunk differ: "
len = vstream->stream->metadata_received - vstream->metadata_sent;
if (!vstream->stream_file.trace_chunk) {
+ if (vstream->stream->trace->session->connection_closed) {
+ /*
+ * If the connection is closed, there is no way for the metadata stream
+ * to ever transition back to an active chunk. As such, signal to the viewer
+ * that there is no new metadata available.
+ *
+ * The stream can be disposed-of. On the next execution of this command,
+ * the relay daemon will reply with an error status since the stream can't
+ * be found.
+ */
+ dispose_of_stream = true;
+ }
+
reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
len = 0;
goto send_reply;
health_code_update();
if (vstream) {
pthread_mutex_unlock(&vstream->stream->lock);
+ pthread_mutex_unlock(&vstream->stream->trace->lock);
+ pthread_mutex_unlock(&vstream->stream->trace->session->lock);
}
ret = send_response(conn->sock, &reply, sizeof(reply));
if (ret < 0) {
end:
if (vstream) {
viewer_stream_put(vstream);
+ if (dispose_of_stream) {
+ /*
+ * Trigger the destruction of the viewer stream
+ * by releasing its global reference.
+ *
+ * The live viewers expect to receive a NO_NEW_METADATA
+ * status before a stream disappears, otherwise they abort the
+ * entire live connection when receiving an error status.
+ *
+ * On the next query for this stream, an error will be reported to the
+ * client.
+ */
+ viewer_stream_put(vstream);
+ }
}
+
return ret;
}