X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=73230955773dcb841f0a6a98a5f0cc941de9751d;hb=f056734385a493c37e45d1d5bcd65718dcac4f5d;hp=ac30351522d159b744d3ee834fbb25cb5c6dd05b;hpb=d359cebc72d028e4a87d75c907d534891c343891;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index ac3035152..732309557 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -689,6 +689,11 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -1065,6 +1070,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->session_id = session->id; stream->index_fd = -1; stream->read_index_fd = -1; + stream->ctf_stream_id = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); @@ -1801,9 +1807,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, DBG("Received live beacon for stream %" PRIu64, stream->stream_handle); /* - * Only flag a stream inactive when it has already received data. + * Only flag a stream inactive when it has already received data + * and no indexes are in flight. */ - if (stream->total_index_received > 0) { + if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) { stream->beacon_ts_end = be64toh(index_info.timestamp_end); } ret = 0; @@ -1820,9 +1827,13 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } index_created = 1; + stream->indexes_in_flight++; } copy_index_control_data(index, &index_info); + if (stream->ctf_stream_id == -1ULL) { + stream->ctf_stream_id = be64toh(index_info.stream_id); + } if (index_created) { /* @@ -1847,6 +1858,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } end_rcu_unlock: @@ -2010,6 +2023,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } index_created = 1; + stream->indexes_in_flight++; } if (rotate_index || stream->index_fd < 0) { @@ -2052,6 +2066,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } error: @@ -2180,7 +2196,6 @@ int relay_process_data(struct relay_connection *conn) stream->tracefile_size, stream->tracefile_count, relayd_uid, relayd_gid, stream->fd, &(stream->tracefile_count_current), &stream->fd); - stream->total_index_received = 0; pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); if (ret < 0) { ERR("Rotating stream output file"); @@ -2253,7 +2268,7 @@ static void destroy_connection(struct lttng_ht *relay_connections_ht, connection_delete(relay_connections_ht, conn); /* For the control socket, we try to destroy the session. */ - if (conn->type == RELAY_CONTROL) { + if (conn->type == RELAY_CONTROL && conn->session) { destroy_session(conn->session, conn->sessions_ht); } @@ -2275,6 +2290,7 @@ void *relay_thread_worker(void *data) struct lttcomm_relayd_hdr recv_hdr; struct relay_local_data *relay_ctx = (struct relay_local_data *) data; struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; + struct relay_index *index; DBG("[thread] Relay worker started"); @@ -2345,6 +2361,11 @@ restart: health_code_update(); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -2447,45 +2468,49 @@ restart: health_code_update(); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Skip the command pipe. It's handled in the first loop. */ if (pollfd == relay_conn_pipe[0]) { continue; } - if (revents) { - rcu_read_lock(); - conn = connection_find_by_sock(relay_connections_ht, pollfd); - if (!conn) { - /* Skip it. Might be removed before. */ + rcu_read_lock(); + conn = connection_find_by_sock(relay_connections_ht, pollfd); + if (!conn) { + /* Skip it. Might be removed before. */ + rcu_read_unlock(); + continue; + } + + if (revents & LPOLLIN) { + if (conn->type != RELAY_DATA) { rcu_read_unlock(); continue; } - if (revents & LPOLLIN) { - if (conn->type != RELAY_DATA) { - continue; - } - - ret = relay_process_data(conn); - /* Connection closed */ - if (ret < 0) { - cleanup_connection_pollfd(&events, pollfd); - destroy_connection(relay_connections_ht, conn); - DBG("Data connection closed with %d", pollfd); - /* - * Every goto restart call sets the last seen fd where - * here we don't really care since we gracefully - * continue the loop after the connection is deleted. - */ - } else { - /* Keep last seen port. */ - last_seen_data_fd = pollfd; - rcu_read_unlock(); - goto restart; - } + ret = relay_process_data(conn); + /* Connection closed */ + if (ret < 0) { + cleanup_connection_pollfd(&events, pollfd); + destroy_connection(relay_connections_ht, conn); + DBG("Data connection closed with %d", pollfd); + /* + * Every goto restart call sets the last seen fd where + * here we don't really care since we gracefully + * continue the loop after the connection is deleted. + */ + } else { + /* Keep last seen port. */ + last_seen_data_fd = pollfd; + rcu_read_unlock(); + goto restart; } - rcu_read_unlock(); } + rcu_read_unlock(); } last_seen_data_fd = -1; } @@ -2506,6 +2531,14 @@ error: } rcu_read_unlock(); error_poll_create: + rcu_read_lock(); + cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, + index_n.node) { + health_code_update(); + relay_index_delete(index); + relay_index_free_safe(index); + } + rcu_read_unlock(); lttng_ht_destroy(indexes_ht); indexes_ht_error: lttng_ht_destroy(relay_connections_ht);