X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=27b34a39f249071867e49e9df7fb438517bb6ee8;hb=56047f5a23df5c2c583a102b8015bbec5a7da9f1;hp=4d64b0ea2be2b7f531f5fc03c1eef4573d2eefc8;hpb=66cefebdc240cbae0bc79594305f509b0779fa98;p=lttng-tools.git diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 4d64b0ea2..27b34a39f 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -202,7 +203,7 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht * return nullptr; } - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_lookup(ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); @@ -210,8 +211,6 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht * stream = lttng::utils::container_of(node, <tng_consumer_stream::node); } - rcu_read_unlock(); - return stream; } @@ -219,7 +218,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; stream = find_stream(key, ht); if (stream) { stream->key = (uint64_t) -1ULL; @@ -230,7 +229,6 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht) */ stream->node.key = (uint64_t) -1ULL; } - rcu_read_unlock(); } /* @@ -273,7 +271,7 @@ static void steal_channel_key(uint64_t key) { struct lttng_consumer_channel *channel; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; channel = consumer_find_channel(key); if (channel) { channel->key = (uint64_t) -1ULL; @@ -284,7 +282,6 @@ static void steal_channel_key(uint64_t key) */ channel->node.key = (uint64_t) -1ULL; } - rcu_read_unlock(); } static void free_channel_rcu(struct rcu_head *head) @@ -404,7 +401,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) if (channel->is_published) { int ret; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; iter.iter.node = &channel->node.node; ret = lttng_ht_del(the_consumer_data.channel_ht, &iter); LTTNG_ASSERT(!ret); @@ -412,7 +409,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) iter.iter.node = &channel->channels_by_session_id_ht_node.node; ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter); LTTNG_ASSERT(!ret); - rcu_read_unlock(); } channel->is_deleted = true; @@ -431,14 +427,15 @@ static void cleanup_relayd_ht() struct lttng_ht_iter iter; struct consumer_relayd_sock_pair *relayd; - rcu_read_lock(); + { + lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { - consumer_destroy_relayd(relayd); + cds_lfht_for_each_entry ( + the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { + consumer_destroy_relayd(relayd); + } } - rcu_read_unlock(); - lttng_ht_destroy(the_consumer_data.relayd_ht); } @@ -457,7 +454,7 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* Let's begin with metadata */ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { @@ -474,7 +471,6 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, DBG("Delete flag set to data stream %d", stream->wait_fd); } } - rcu_read_unlock(); } /* @@ -581,7 +577,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* Steal stream identifier to avoid having streams with the same key */ steal_stream_key(stream->key, ht); @@ -614,7 +610,6 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) the_consumer_data.stream_count++; the_consumer_data.need_update = 1; - rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&stream->chan->lock); @@ -720,7 +715,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path LTTNG_ASSERT(path); /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != nullptr) { /* Add stream on the relayd */ @@ -757,7 +752,6 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path stream->net_seq_idx); end: - rcu_read_unlock(); return ret; } @@ -774,7 +768,7 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) LTTNG_ASSERT(net_seq_idx != -1ULL); /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(net_seq_idx); if (relayd != nullptr) { /* Add stream on the relayd */ @@ -797,7 +791,6 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) DBG("All streams sent relayd id %" PRIu64, net_seq_idx); end: - rcu_read_unlock(); return ret; } @@ -809,12 +802,11 @@ void close_relayd_stream(struct lttng_consumer_stream *stream) struct consumer_relayd_sock_pair *relayd; /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd) { consumer_stream_relayd_close(stream, relayd); } - rcu_read_unlock(); } /* @@ -1127,11 +1119,10 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ steal_channel_key(channel->key); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node); lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht, &channel->channels_by_session_id_ht_node); - rcu_read_unlock(); channel->is_published = true; pthread_mutex_unlock(&channel->timer_lock); @@ -1169,35 +1160,34 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, DBG("Updating poll fd array"); *nb_inactive_fd = 0; - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Only active streams with an active end point can be added to the - * poll set and local stream storage of the thread. - * - * There is a potential race here for endpoint_status to be updated - * just after the check. However, this is OK since the stream(s) will - * be deleted once the thread is notified that the end point state has - * changed where this function will be called back again. - * - * We track the number of inactive FDs because they still need to be - * closed by the polling thread after a wakeup on the data_pipe or - * metadata_pipe. - */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { - (*nb_inactive_fd)++; - continue; + + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { + /* + * Only active streams with an active end point can be added to the + * poll set and local stream storage of the thread. + * + * There is a potential race here for endpoint_status to be updated + * just after the check. However, this is OK since the stream(s) will + * be deleted once the thread is notified that the end point state has + * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. + */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; + continue; + } + + (*pollfd)[i].fd = stream->wait_fd; + (*pollfd)[i].events = POLLIN | POLLPRI; + local_stream[i] = stream; + i++; } - /* - * This clobbers way too much the debug output. Uncomment that if you - * need it for debugging purposes. - */ - (*pollfd)[i].fd = stream->wait_fd; - (*pollfd)[i].events = POLLIN | POLLPRI; - local_stream[i] = stream; - i++; } - rcu_read_unlock(); /* * Insert the consumer_data_pipe at the end of the array and don't @@ -1278,14 +1268,15 @@ void lttng_consumer_cleanup() struct lttng_consumer_channel *channel; unsigned int trace_chunks_left; - rcu_read_lock(); + { + lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { - consumer_del_channel(channel); + cds_lfht_for_each_entry ( + the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { + consumer_del_channel(channel); + } } - rcu_read_unlock(); - lttng_ht_destroy(the_consumer_data.channel_ht); lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht); @@ -1486,15 +1477,16 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) return; } - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_stream(stream, ht); + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); + } } - rcu_read_unlock(); lttng_ht_destroy(ht); } @@ -1512,15 +1504,16 @@ static void destroy_metadata_stream_ht(struct lttng_ht *ht) return; } - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_metadata_stream(stream, ht); + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); + } } - rcu_read_unlock(); lttng_ht_destroy(ht); } @@ -1622,7 +1615,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre size_t write_len; /* RCU lock for the relayd pointer */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk); /* Flag that the current stream if set for network streaming. */ @@ -1762,7 +1755,6 @@ end: pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } - rcu_read_unlock(); return ret; } @@ -1801,7 +1793,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data } /* RCU lock for the relayd pointer */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != (uint64_t) -1ULL) { @@ -1984,7 +1976,6 @@ end: pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } - rcu_read_unlock(); return written; } @@ -2205,7 +2196,7 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) * after this point. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* * Lookup the stream just to make sure it does not exist in our internal @@ -2239,8 +2230,6 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) */ lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id); - rcu_read_unlock(); - pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&stream->chan->timer_lock); @@ -2257,16 +2246,18 @@ static void validate_endpoint_status_data_stream() DBG("Consumer delete flagged data stream"); - rcu_read_lock(); - cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { - /* Validate delete flag of the stream */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { - continue; + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { + /* Validate delete flag of the stream */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { + continue; + } + /* Delete it right now */ + consumer_del_stream(stream, data_ht); } - /* Delete it right now */ - consumer_del_stream(stream, data_ht); } - rcu_read_unlock(); } /* @@ -2281,22 +2272,23 @@ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *po LTTNG_ASSERT(pollset); - rcu_read_lock(); - cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { - /* Validate delete flag of the stream */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { - continue; - } - /* - * Remove from pollset so the metadata thread can continue without - * blocking on a deleted stream. - */ - lttng_poll_del(pollset, stream->wait_fd); + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { + /* Validate delete flag of the stream */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { + continue; + } + /* + * Remove from pollset so the metadata thread can continue without + * blocking on a deleted stream. + */ + lttng_poll_del(pollset, stream->wait_fd); - /* Delete it right now */ - consumer_del_metadata_stream(stream, metadata_ht); + /* Delete it right now */ + consumer_del_metadata_stream(stream, metadata_ht); + } } - rcu_read_unlock(); } /* @@ -2431,7 +2423,7 @@ void *consumer_thread_metadata_poll(void *data) continue; } - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; { uint64_t tmp_id = (uint64_t) pollfd; @@ -2495,11 +2487,9 @@ void *consumer_thread_metadata_poll(void *data) consumer_del_metadata_stream(stream, metadata_ht); } else { ERR("Unexpected poll events %u for sock %d", revents, pollfd); - rcu_read_unlock(); goto end; } /* Release RCU lock for the stream looked up */ - rcu_read_unlock(); } } @@ -2838,7 +2828,7 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe ht = the_consumer_data.stream_per_chan_id_ht; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, @@ -2878,7 +2868,6 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe next: pthread_mutex_unlock(&stream->lock); } - rcu_read_unlock(); } static void destroy_channel_ht(struct lttng_ht *ht) @@ -2891,12 +2880,14 @@ static void destroy_channel_ht(struct lttng_ht *ht) return; } - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) { - ret = lttng_ht_del(ht, &iter); - LTTNG_ASSERT(ret != 0); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) { + ret = lttng_ht_del(ht, &iter); + LTTNG_ASSERT(ret != 0); + } } - rcu_read_unlock(); lttng_ht_destroy(ht); } @@ -2998,19 +2989,20 @@ void *consumer_thread_channel_poll(void *data) switch (action) { case CONSUMER_CHANNEL_ADD: + { DBG("Adding channel %d to poll set", chan->wait_fd); lttng_ht_node_init_u64(&chan->wait_fd_node, chan->wait_fd); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_add_unique_u64(channel_ht, &chan->wait_fd_node); - rcu_read_unlock(); /* Add channel to the global poll events list */ // FIXME: Empty flag on a pipe pollset, this might // hang on FreeBSD. lttng_poll_add(&events, chan->wait_fd, 0); break; + } case CONSUMER_CHANNEL_DEL: { /* @@ -3023,10 +3015,9 @@ void *consumer_thread_channel_poll(void *data) * GET_CHANNEL failed. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; chan = consumer_find_channel(key); if (!chan) { - rcu_read_unlock(); ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key); @@ -3059,7 +3050,6 @@ void *consumer_thread_channel_poll(void *data) if (!uatomic_sub_return(&chan->refcount, 1)) { consumer_del_channel(chan); } - rcu_read_unlock(); goto restart; } case CONSUMER_CHANNEL_QUIT: @@ -3093,7 +3083,7 @@ void *consumer_thread_channel_poll(void *data) continue; } - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; { uint64_t tmp_id = (uint64_t) pollfd; @@ -3126,12 +3116,10 @@ void *consumer_thread_channel_poll(void *data) } } else { ERR("Unexpected poll events %u for sock %d", revents, pollfd); - rcu_read_unlock(); goto end; } /* Release RCU lock for the channel looked up */ - rcu_read_unlock(); } } @@ -3773,7 +3761,7 @@ int consumer_data_pending(uint64_t id) DBG("Consumer data pending command on session id %" PRIu64, id); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&the_consumer_data.lock); switch (the_consumer_data.type) { @@ -3887,13 +3875,11 @@ int consumer_data_pending(uint64_t id) data_not_pending: /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); return 0; data_pending: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); return 1; } @@ -4020,7 +4006,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, nullptr); lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&channel->lock); LTTNG_ASSERT(channel->trace_chunk); @@ -4372,7 +4358,6 @@ end_unlock_stream: end_unlock_channel: pthread_mutex_unlock(&channel->lock); end: - rcu_read_unlock(); lttng_dynamic_array_reset(&stream_rotation_positions); lttng_dynamic_pointer_array_reset(&streams_packet_to_open); return ret; @@ -4459,7 +4444,7 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha int ret; struct lttng_consumer_stream *stream; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&channel->lock); cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); @@ -4471,13 +4456,11 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha pthread_mutex_unlock(&stream->lock); } pthread_mutex_unlock(&channel->lock); - rcu_read_unlock(); return 0; error_unlock: pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&channel->lock); - rcu_read_unlock(); return ret; } @@ -4673,7 +4656,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, ASSERT_RCU_READ_LOCKED(); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; DBG("Consumer rotate ready streams in channel %" PRIu64, key); @@ -4708,7 +4691,6 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, ret = 0; end: - rcu_read_unlock(); return ret; } @@ -4837,46 +4819,50 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, goto error; } - rcu_read_lock(); - cds_lfht_for_each_entry_duplicate( - the_consumer_data.channels_by_session_id_ht->ht, - the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed), - the_consumer_data.channels_by_session_id_ht->match_fct, - &session_id, - &iter.iter, - channel, - channels_by_session_id_ht_node.node) { - ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk); - if (ret) { - /* - * Roll-back the creation of this chunk. - * - * This is important since the session daemon will - * assume that the creation of this chunk failed and - * will never ask for it to be closed, resulting - * in a leak and an inconsistent state for some - * channels. - */ - enum lttcomm_return_code close_ret; - char path[LTTNG_PATH_MAX]; + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry_duplicate( + the_consumer_data.channels_by_session_id_ht->ht, + the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, + lttng_ht_seed), + the_consumer_data.channels_by_session_id_ht->match_fct, + &session_id, + &iter.iter, + channel, + channels_by_session_id_ht_node.node) + { + ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk); + if (ret) { + /* + * Roll-back the creation of this chunk. + * + * This is important since the session daemon will + * assume that the creation of this chunk failed and + * will never ask for it to be closed, resulting + * in a leak and an inconsistent state for some + * channels. + */ + enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; + + DBG("Failed to set new trace chunk on existing channels, rolling back"); + close_ret = + lttng_consumer_close_trace_chunk(relayd_id, + session_id, + chunk_id, + chunk_creation_timestamp, + nullptr, + path); + if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 + ", chunk_id = %" PRIu64, + session_id, + chunk_id); + } - DBG("Failed to set new trace chunk on existing channels, rolling back"); - close_ret = lttng_consumer_close_trace_chunk(relayd_id, - session_id, - chunk_id, - chunk_creation_timestamp, - nullptr, - path); - if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { - ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 - ", chunk_id = %" PRIu64, - session_id, - chunk_id); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + break; } - - ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - break; } } @@ -4914,7 +4900,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, } } error_unlock: - rcu_read_unlock(); error: /* Release the reference returned by the "publish" operation. */ lttng_trace_chunk_put(published_chunk); @@ -4990,30 +4975,32 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, * it; it is only kept around to compare it (by address) to the * current chunk found in the session's channels. */ - rcu_read_lock(); - cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { - int ret; + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry ( + the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { + int ret; - /* - * Only change the channel's chunk to NULL if it still - * references the chunk being closed. The channel may - * reference a newer channel in the case of a session - * rotation. When a session rotation occurs, the "next" - * chunk is created before the "current" chunk is closed. - */ - if (channel->trace_chunk != chunk) { - continue; - } - ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr); - if (ret) { /* - * Attempt to close the chunk on as many channels as - * possible. + * Only change the channel's chunk to NULL if it still + * references the chunk being closed. The channel may + * reference a newer channel in the case of a session + * rotation. When a session rotation occurs, the "next" + * chunk is created before the "current" chunk is closed. */ - ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + if (channel->trace_chunk != chunk) { + continue; + } + ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr); + if (ret) { + /* + * Attempt to close the chunk on as many channels as + * possible. + */ + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + } } } - if (relayd_id) { int ret; struct consumer_relayd_sock_pair *relayd; @@ -5033,7 +5020,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, } } error_unlock: - rcu_read_unlock(); end: /* * Release the reference returned by the "find" operation and @@ -5055,6 +5041,7 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id const bool is_local_trace = !relayd_id; struct consumer_relayd_sock_pair *relayd = nullptr; bool chunk_exists_local, chunk_exists_remote; + lttng::urcu::read_lock_guard read_lock; if (relayd_id) { /* Only used for logging purposes. */ @@ -5087,7 +5074,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id goto end; } - rcu_read_lock(); relayd = consumer_find_relayd(*relayd_id); if (!relayd) { ERR("Failed to find relayd %" PRIu64, *relayd_id); @@ -5109,7 +5095,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist"); end_rcu_unlock: - rcu_read_unlock(); end: return ret_code; } @@ -5123,7 +5108,7 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann ht = the_consumer_data.stream_per_chan_id_ht; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, @@ -5146,12 +5131,10 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann next: pthread_mutex_unlock(&stream->lock); } - rcu_read_unlock(); return LTTCOMM_CONSUMERD_SUCCESS; error_unlock: pthread_mutex_unlock(&stream->lock); - rcu_read_unlock(); return ret; } @@ -5192,56 +5175,56 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consum goto end; } - rcu_read_lock(); - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { - enum consumer_stream_open_packet_status status; + { + lttng::urcu::read_lock_guard read_lock; + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + enum consumer_stream_open_packet_status status; - pthread_mutex_lock(&stream->lock); - if (cds_lfht_is_node_deleted(&stream->node.node)) { - goto next; - } + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } - status = consumer_stream_open_packet(stream); - switch (status) { - case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: - DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, - stream->chan->name, - stream->chan->session_id); - stream->opened_packet_in_current_trace_chunk = true; - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: - DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, - stream->chan->name, - stream->chan->session_id); - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: - /* - * Only unexpected internal errors can lead to this - * failing. Report an unknown error. - */ - ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 - ", channel id = %" PRIu64 ", channel name = %s" - ", session id = %" PRIu64, - stream->key, - channel->key, - channel->name, - channel->session_id); - ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR; - goto error_unlock; - default: - abort(); - } + status = consumer_stream_open_packet(stream); + switch (status) { + case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: + DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: + /* + * Only unexpected internal errors can lead to this + * failing. Report an unknown error. + */ + ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 + ", channel id = %" PRIu64 ", channel name = %s" + ", session id = %" PRIu64, + stream->key, + channel->key, + channel->name, + channel->session_id); + ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR; + goto error_unlock; + default: + abort(); + } - next: - pthread_mutex_unlock(&stream->lock); + next: + pthread_mutex_unlock(&stream->lock); + } } - end_rcu_unlock: - rcu_read_unlock(); end: return ret;