X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.cpp;h=ea0ed2119b5237684b35695038658fa2ed4ea5b6;hb=283a96e49f066a4263726271bc64aa7e94ae0e92;hp=9793e277808975728d5f6965e12f8648de35a9ff;hpb=5c7248cd5bce45bf64d563fb4e130a63bf345f11;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index 9793e2778..ea0ed2119 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #include #include @@ -1320,12 +1321,13 @@ static void *relay_thread_dispatcher(void *data __attribute__((unused))) * the data will be read at some point in time * or wait to the end of the world :) */ - ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn)); /* NOLINT - sizeof - used - on a - pointer. - */ + ret = lttng_write( + relay_conn_pipe[1], &new_conn, sizeof(new_conn)); /* NOLINT + sizeof + used + on a + pointer. + */ if (ret < 0) { PERROR("write connection pipe"); connection_put(new_conn); @@ -1517,12 +1519,11 @@ static void publish_connection_local_streams(struct relay_connection *conn) * session lock. */ pthread_mutex_lock(&session->lock); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; cds_list_for_each_entry_rcu(stream, &session->recv_list, recv_node) { stream_publish(stream); } - rcu_read_unlock(); /* * Inform the viewer that there are new streams in the session. @@ -1530,6 +1531,7 @@ static void publish_connection_local_streams(struct relay_connection *conn) if (session->viewer_attached) { uatomic_set(&session->new_streams, 1); } + pthread_mutex_unlock(&session->lock); } @@ -2224,21 +2226,25 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, * to iterate over all streams to find the one associated with * the right session_id. */ - rcu_read_lock(); - cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { - if (!stream_get(stream)) { - continue; - } - if (stream->trace->session->id == msg.session_id) { - pthread_mutex_lock(&stream->lock); - stream->data_pending_check_done = false; - pthread_mutex_unlock(&stream->lock); - DBG("Set begin data pending flag to stream %" PRIu64, - stream->stream_handle); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { + if (!stream_get(stream)) { + continue; + } + + if (stream->trace->session->id == msg.session_id) { + pthread_mutex_lock(&stream->lock); + stream->data_pending_check_done = false; + pthread_mutex_unlock(&stream->lock); + DBG("Set begin data pending flag to stream %" PRIu64, + stream->stream_handle); + } + + stream_put(stream); } - stream_put(stream); } - rcu_read_unlock(); memset(&reply, 0, sizeof(reply)); /* All good, send back reply. */ @@ -2299,43 +2305,49 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at * Iterate over all streams to see if the begin data pending * flag is set. */ - rcu_read_lock(); - cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { - if (!stream_get(stream)) { - continue; - } - if (stream->trace->session->id != msg.session_id) { - stream_put(stream); - continue; - } - pthread_mutex_lock(&stream->lock); - if (!stream->data_pending_check_done) { - uint64_t stream_seq; + { + lttng::urcu::read_lock_guard read_lock; - if (session_streams_have_index(conn->session)) { - /* - * Ensure that both the index and stream data have been - * flushed up to the requested point. - */ - stream_seq = - std::min(stream->prev_data_seq, stream->prev_index_seq); - } else { - stream_seq = stream->prev_data_seq; + cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { + if (!stream_get(stream)) { + continue; } - if (!stream->closed || - !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { - is_data_inflight = 1; - DBG("Data is still in flight for stream %" PRIu64, - stream->stream_handle); - pthread_mutex_unlock(&stream->lock); + + if (stream->trace->session->id != msg.session_id) { stream_put(stream); - break; + continue; + } + + pthread_mutex_lock(&stream->lock); + if (!stream->data_pending_check_done) { + uint64_t stream_seq; + + if (session_streams_have_index(conn->session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = std::min(stream->prev_data_seq, + stream->prev_index_seq); + } else { + stream_seq = stream->prev_data_seq; + } + + if (!stream->closed || + !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { + is_data_inflight = 1; + DBG("Data is still in flight for stream %" PRIu64, + stream->stream_handle); + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + break; + } } + + pthread_mutex_unlock(&stream->lock); + stream_put(stream); } - pthread_mutex_unlock(&stream->lock); - stream_put(stream); } - rcu_read_unlock(); memset(&reply, 0, sizeof(reply)); /* All good, send back reply. */ @@ -4161,19 +4173,23 @@ restart: exit: error: /* Cleanup remaining connection object. */ - rcu_read_lock(); - cds_lfht_for_each_entry (relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { - health_code_update(); + { + lttng::urcu::read_lock_guard read_lock; - session_abort(destroy_conn->session); + cds_lfht_for_each_entry ( + relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { + health_code_update(); - /* - * No need to grab another ref, because we own - * destroy_conn. - */ - relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn); + session_abort(destroy_conn->session); + + /* + * No need to grab another ref, because we own + * destroy_conn. + */ + relay_thread_close_connection( + &events, destroy_conn->sock->fd, destroy_conn); + } } - rcu_read_unlock(); (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); error_poll_create: