X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.cpp;h=1168b827a6d13122d06683fbdeae1b278d5b9c23;hb=283a96e49f066a4263726271bc64aa7e94ae0e92;hp=f1f97f91748b9337c43636d2949cf77b187ed336;hpb=d73aeddd1b4de7fadc7b6f6f5004c6298208602a;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index f1f97f917..1168b827a 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -238,7 +238,6 @@ static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) static int check_new_streams(struct relay_connection *conn) { struct relay_session *session; - unsigned long current_val; int ret = 0; if (!conn->viewer_session) { @@ -253,15 +252,17 @@ static int check_new_streams(struct relay_connection *conn) if (!session_get(session)) { continue; } - current_val = uatomic_cmpxchg(&session->new_streams, 1, 0); - ret = current_val; + + ret = uatomic_read(&session->new_streams); session_put(session); if (ret == 1) { goto end; } } } + end: + DBG("Viewer connection has%s new streams: socket_fd = %d", ret == 0 ? " no" : "", conn->sock->fd); return ret; } @@ -1218,6 +1219,8 @@ static int viewer_get_new_streams(struct relay_connection *conn) response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply_unlock; } + + uatomic_set(&session->new_streams, 0); send_streams = 1; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); @@ -1661,6 +1664,7 @@ static int viewer_get_next_index(struct relay_connection *conn) bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind; uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL; enum lttng_trace_chunk_status status; + bool attached_sessions_have_new_streams = false; LTTNG_ASSERT(conn); @@ -1712,6 +1716,17 @@ static int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } + ret = check_new_streams(conn); + if (ret < 0) { + viewer_index.status = LTTNG_VIEWER_INDEX_ERR; + ERR("Error checking for new streams in the attached sessions, returning status=%s", + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); + goto send_reply; + } else if (ret == 1) { + attached_sessions_have_new_streams = true; + } + if (rstream->ongoing_rotation.is_set) { /* Rotation is ongoing, try again later. */ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; @@ -1821,6 +1836,7 @@ static int viewer_get_next_index(struct relay_connection *conn) */ goto send_reply; } + /* At this point, ret is 0 thus we will be able to read the index. */ LTTNG_ASSERT(!ret); @@ -1899,19 +1915,6 @@ static int viewer_get_next_index(struct relay_connection *conn) vstream->stream_file.handle = fs_handle; } - ret = check_new_streams(conn); - if (ret < 0) { - viewer_index.status = LTTNG_VIEWER_INDEX_ERR; - ERR("Error checking for new streams before sending new index to stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); - goto send_reply; - } else if (ret == 1) { - viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; - } - ret = lttng_index_file_read(vstream->index_file, &packet_index); if (ret) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; @@ -1962,6 +1965,10 @@ send_reply: pthread_mutex_unlock(&metadata_viewer_stream->stream->lock); } + if (attached_sessions_have_new_streams) { + viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; + } + viewer_index.flags = htobe32(viewer_index.flags); viewer_index.status = htobe32(viewer_index.status); health_code_update();