X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=c266465fb3881da890a591cde1e014702f70115e;hb=5c764488e215742fbad29fd12ef904fda59878db;hp=2e1266de6b699f994a6b4605a7ce53f3827a5c54;hpb=94814ee3f61dccf03001535eeaff10670b47e63b;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 2e1266de6..c266465fb 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -88,6 +88,27 @@ static uint64_t last_relay_viewer_session_id; static pthread_mutex_t last_relay_viewer_session_id_lock = PTHREAD_MUTEX_INITIALIZER; +static const char * +lttng_viewer_next_index_return_code_str(enum lttng_viewer_next_index_return_code code) +{ + switch (code) { + case LTTNG_VIEWER_INDEX_OK: + return "INDEX_OK"; + case LTTNG_VIEWER_INDEX_RETRY: + return "INDEX_RETRY"; + case LTTNG_VIEWER_INDEX_HUP: + return "INDEX_HUP"; + case LTTNG_VIEWER_INDEX_ERR: + return "INDEX_ERR"; + case LTTNG_VIEWER_INDEX_INACTIVE: + return "INDEX_INACTIVE"; + case LTTNG_VIEWER_INDEX_EOF: + return "INDEX_EOF"; + default: + abort(); + } +} + /* * Cleanup the daemon */ @@ -156,28 +177,30 @@ static int check_new_streams(struct relay_connection *conn) { struct relay_session *session; - unsigned long current_val; int ret = 0; + rcu_read_lock(); if (!conn->viewer_session) { goto end; } - rcu_read_lock(); - cds_list_for_each_entry_rcu(session, - &conn->viewer_session->session_list, - viewer_session_node) { + + cds_list_for_each_entry_rcu( + session, &conn->viewer_session->session_list, viewer_session_node) + { if (!session_get(session)) { continue; } - current_val = uatomic_cmpxchg(&session->new_streams, 1, 0); - ret = current_val; + + ret = uatomic_read(&session->new_streams); session_put(session); if (ret == 1) { goto end; } } + end: rcu_read_unlock(); + DBG("Viewer connection has%s new streams: socket_fd = %d", ret == 0 ? " no" : "", conn->sock->fd); return ret; } @@ -1187,6 +1210,8 @@ int viewer_get_new_streams(struct relay_connection *conn) response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply_unlock; } + + uatomic_set(&session->new_streams, 0); send_streams = 1; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); @@ -1601,6 +1626,7 @@ int viewer_get_next_index(struct relay_connection *conn) struct relay_stream *rstream = NULL; struct ctf_trace *ctf_trace = NULL; struct relay_viewer_stream *metadata_viewer_stream = NULL; + bool attached_sessions_have_new_streams = false; assert(conn); @@ -1648,6 +1674,17 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } + ret = check_new_streams(conn); + if (ret < 0) { + viewer_index.status = LTTNG_VIEWER_INDEX_ERR; + ERR("Error checking for new streams in the attached sessions, returning status=%s", + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); + goto send_reply; + } else if (ret == 1) { + attached_sessions_have_new_streams = true; + } + if (rstream->ongoing_rotation.is_set) { /* Rotation is ongoing, try again later. */ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); @@ -1710,6 +1747,7 @@ int viewer_get_next_index(struct relay_connection *conn) */ goto send_reply; } + /* At this point, ret is 0 thus we will be able to read the index. */ assert(!ret); @@ -1769,14 +1807,6 @@ int viewer_get_next_index(struct relay_connection *conn) vstream->stream_file.handle = fs_handle; } - ret = check_new_streams(conn); - if (ret < 0) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); - goto send_reply; - } else if (ret == 1) { - viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; - } - ret = lttng_index_file_read(vstream->index_file, &packet_index); if (ret) { ERR("Relay error reading index file"); @@ -1821,6 +1851,10 @@ send_reply: pthread_mutex_unlock(&metadata_viewer_stream->stream->lock); } + if (attached_sessions_have_new_streams) { + viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; + } + viewer_index.flags = htobe32(viewer_index.flags); health_code_update(); @@ -1982,6 +2016,7 @@ int viewer_get_metadata(struct relay_connection *conn) struct lttng_viewer_get_metadata request; struct lttng_viewer_metadata_packet reply; struct relay_viewer_stream *vstream = NULL; + bool dispose_of_stream = false; assert(conn); @@ -2012,6 +2047,9 @@ int viewer_get_metadata(struct relay_connection *conn) reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); goto send_reply; } + + pthread_mutex_lock(&vstream->stream->trace->session->lock); + pthread_mutex_lock(&vstream->stream->trace->lock); pthread_mutex_lock(&vstream->stream->lock); if (!vstream->stream->is_metadata) { ERR("Invalid metadata stream"); @@ -2020,11 +2058,7 @@ int viewer_get_metadata(struct relay_connection *conn) if (vstream->metadata_sent >= vstream->stream->metadata_received) { /* - * The live viewers expect to receive a NO_NEW_METADATA - * status before a stream disappears, otherwise they abort the - * entire live connection when receiving an error status. - * - * Clear feature resets the metadata_sent to 0 until the + * Clear feature resets the metadata_received to 0 until the * same metadata is received again. */ reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); @@ -2032,13 +2066,7 @@ int viewer_get_metadata(struct relay_connection *conn) * The live viewer considers a closed 0 byte metadata stream as * an error. */ - if (vstream->metadata_sent > 0) { - if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) { - /* Release ownership for the viewer metadata stream. */ - viewer_stream_put(vstream); - } - vstream->stream->no_new_metadata_notified = true; - } + dispose_of_stream = vstream->metadata_sent > 0 && vstream->stream->closed; goto send_reply; } @@ -2078,6 +2106,19 @@ int viewer_get_metadata(struct relay_connection *conn) len = vstream->stream->metadata_received - vstream->metadata_sent; if (!vstream->stream_file.trace_chunk) { + if (vstream->stream->trace->session->connection_closed) { + /* + * If the connection is closed, there is no way for the metadata stream + * to ever transition back to an active chunk. As such, signal to the viewer + * that there is no new metadata available. + * + * The stream can be disposed-of. On the next execution of this command, + * the relay daemon will reply with an error status since the stream can't + * be found. + */ + dispose_of_stream = true; + } + reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); len = 0; goto send_reply; @@ -2207,6 +2248,8 @@ send_reply: health_code_update(); if (vstream) { pthread_mutex_unlock(&vstream->stream->lock); + pthread_mutex_unlock(&vstream->stream->trace->lock); + pthread_mutex_unlock(&vstream->stream->trace->session->lock); } ret = send_response(conn->sock, &reply, sizeof(reply)); if (ret < 0) { @@ -2231,7 +2274,22 @@ end_free: end: if (vstream) { viewer_stream_put(vstream); + if (dispose_of_stream) { + /* + * Trigger the destruction of the viewer stream + * by releasing its global reference. + * + * The live viewers expect to receive a NO_NEW_METADATA + * status before a stream disappears, otherwise they abort the + * entire live connection when receiving an error status. + * + * On the next query for this stream, an error will be reported to the + * client. + */ + viewer_stream_put(vstream); + } } + return ret; }