X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.cpp;h=3f40d03ed3d846cb8fa95788c94bf555be5fb28f;hb=HEAD;hp=ea3e3619106654d556bdbf543afca0ce75308bd2;hpb=28ab034a2c3582d07d3423d2d746731f87d3969f;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index ea3e36191..eca03b820 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -12,22 +12,25 @@ #include #include -#include #include #include #include #include #include #include +#include #include +#include #include #include +#include #include #include #include #include +#include #include #include #include @@ -61,9 +64,10 @@ static int add_channel(struct lttng_consumer_channel *channel, int ret = 0; LTTNG_ASSERT(channel); + LTTNG_ASSERT(!channel->is_deleted); LTTNG_ASSERT(ctx); - if (ctx->on_recv_channel != NULL) { + if (ctx->on_recv_channel != nullptr) { ret = ctx->on_recv_channel(channel); if (ret == 0) { ret = consumer_add_channel(channel, ctx); @@ -95,9 +99,10 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int *_alloc_ret) { int alloc_ret; - struct lttng_consumer_stream *stream = NULL; + struct lttng_consumer_stream *stream = nullptr; LTTNG_ASSERT(channel); + LTTNG_ASSERT(!channel->is_deleted); LTTNG_ASSERT(ctx); stream = consumer_stream_create(channel, @@ -111,7 +116,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, &alloc_ret, channel->type, channel->monitor); - if (stream == NULL) { + if (stream == nullptr) { switch (alloc_ret) { case -ENOENT: /* @@ -167,7 +172,8 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, stream->globally_visible = 1; cds_list_del_init(&stream->send_node); - ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); + ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); /* NOLINT sizeof used on a + pointer. */ if (ret < 0) { ERR("Consumer write %s stream to pipe %d", stream->metadata_flag ? "metadata" : "data", @@ -214,9 +220,10 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, int ret, cpu = 0; struct lttng_ust_ctl_consumer_stream *ustream; struct lttng_consumer_stream *stream; - pthread_mutex_t *current_stream_lock = NULL; + pthread_mutex_t *current_stream_lock = nullptr; LTTNG_ASSERT(channel); + LTTNG_ASSERT(!channel->is_deleted); LTTNG_ASSERT(ctx); /* @@ -302,7 +309,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, } } pthread_mutex_unlock(&stream->lock); - current_stream_lock = NULL; + current_stream_lock = nullptr; } return 0; @@ -354,6 +361,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, struct lttng_ust_ctl_consumer_channel *ust_channel; LTTNG_ASSERT(channel); + LTTNG_ASSERT(!channel->is_deleted); LTTNG_ASSERT(attr); LTTNG_ASSERT(ust_chanp); LTTNG_ASSERT(channel->buffer_credentials.is_set); @@ -472,17 +480,20 @@ static int send_channel_to_sessiond_and_relayd(int sock, int *relayd_error) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; - struct lttng_consumer_stream *stream; uint64_t net_seq_idx = -1ULL; LTTNG_ASSERT(channel); + LTTNG_ASSERT(!channel->is_deleted); LTTNG_ASSERT(ctx); LTTNG_ASSERT(sock >= 0); DBG("UST consumer sending channel %s to sessiond", channel->name); if (channel->relayd_id != (uint64_t) -1ULL) { - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + for (auto stream : + lttng::urcu::list_iteration_adapter( + channel->streams.head)) { health_code_update(); /* Try to send the stream to the relayd if one is available. */ @@ -528,7 +539,9 @@ static int send_channel_to_sessiond_and_relayd(int sock, } /* The channel was sent successfully to the sessiond at this point. */ - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { health_code_update(); /* Send stream to session daemon. */ @@ -539,7 +552,7 @@ static int send_channel_to_sessiond_and_relayd(int sock, } /* Tell sessiond there is no more stream. */ - ret = lttng_ust_ctl_send_stream_to_sessiond(sock, NULL); + ret = lttng_ust_ctl_send_stream_to_sessiond(sock, nullptr); if (ret < 0) { goto error; } @@ -571,6 +584,7 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, LTTNG_ASSERT(ctx); LTTNG_ASSERT(channel); + LTTNG_ASSERT(!channel->is_deleted); LTTNG_ASSERT(attr); /* @@ -626,13 +640,15 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel, struct lttng_consumer_local_data *ctx) { int ret = 0; - struct lttng_consumer_stream *stream, *stmp; LTTNG_ASSERT(channel); + LTTNG_ASSERT(!channel->is_deleted); LTTNG_ASSERT(ctx); /* Send streams to the corresponding thread. */ - cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) { + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { health_code_update(); /* Sending the stream to the thread. */ @@ -659,13 +675,11 @@ static int flush_channel(uint64_t chan_key) { int ret = 0; struct lttng_consumer_channel *channel; - struct lttng_consumer_stream *stream; - struct lttng_ht *ht; - struct lttng_ht_iter iter; + const auto ht = the_consumer_data.stream_per_chan_id_ht; DBG("UST consumer flush channel key %" PRIu64, chan_key); - rcu_read_lock(); + const lttng::urcu::read_lock_guard read_lock; channel = consumer_find_channel(chan_key); if (!channel) { ERR("UST consumer flush channel %" PRIu64 " not found", chan_key); @@ -673,17 +687,15 @@ static int flush_channel(uint64_t chan_key) goto error; } - ht = the_consumer_data.stream_per_chan_id_ht; - /* For each stream of the channel id, flush it. */ - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { health_code_update(); pthread_mutex_lock(&stream->lock); @@ -720,7 +732,6 @@ static int flush_channel(uint64_t chan_key) */ sample_and_send_channel_buffer_stats(channel); error: - rcu_read_unlock(); return ret; } @@ -732,42 +743,33 @@ error: */ static int clear_quiescent_channel(uint64_t chan_key) { - int ret = 0; - struct lttng_consumer_channel *channel; - struct lttng_consumer_stream *stream; - struct lttng_ht *ht; - struct lttng_ht_iter iter; - DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key); - rcu_read_lock(); - channel = consumer_find_channel(chan_key); + const lttng::urcu::read_lock_guard read_lock; + auto channel = consumer_find_channel(chan_key); if (!channel) { ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key); - ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; - goto error; + return LTTNG_ERR_UST_CHAN_NOT_FOUND; } - ht = the_consumer_data.stream_per_chan_id_ht; + const auto ht = the_consumer_data.stream_per_chan_id_ht; /* For each stream of the channel id, clear quiescent state. */ - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { health_code_update(); - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); stream->quiescent = false; - pthread_mutex_unlock(&stream->lock); } -error: - rcu_read_unlock(); - return ret; + + return 0; } /* @@ -928,8 +930,10 @@ error: * the stream is still in the local stream list of the channel. This call * will make sure to clean that list. */ - consumer_stream_destroy(metadata->metadata_stream, NULL); - metadata->metadata_stream = NULL; + consumer_stream_destroy(metadata->metadata_stream, nullptr); + metadata->metadata_stream = nullptr; + metadata->metadata_pushed_wait_queue.wake_all(); + send_streams_error: error_no_stream: end: @@ -957,7 +961,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path); - rcu_read_lock(); + const lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(!metadata_channel->monitor); @@ -967,7 +971,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1); if (ret < 0) { goto error; } @@ -1011,11 +1015,11 @@ error_stream: * Clean up the stream completely because the next snapshot will use a * new metadata stream. */ - consumer_stream_destroy(metadata_stream, NULL); - metadata_channel->metadata_stream = NULL; + consumer_stream_destroy(metadata_stream, nullptr); + metadata_channel->metadata_stream = nullptr; + metadata_channel->metadata_pushed_wait_queue.wake_all(); error: - rcu_read_unlock(); return ret; } @@ -1060,13 +1064,12 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, int ret; unsigned use_relayd = 0; unsigned long consumed_pos, produced_pos; - struct lttng_consumer_stream *stream; LTTNG_ASSERT(path); LTTNG_ASSERT(ctx); ASSERT_RCU_READ_LOCKED(); - rcu_read_lock(); + const lttng::urcu::read_lock_guard read_lock; if (relayd_id != (uint64_t) -1ULL) { use_relayd = 1; @@ -1075,11 +1078,13 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(!channel->monitor); DBG("UST consumer snapshot channel %" PRIu64, key); - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { health_code_update(); /* Lock stream because we are about to change its state. */ - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); LTTNG_ASSERT(channel->trace_chunk); if (!lttng_trace_chunk_get(channel->trace_chunk)) { /* @@ -1087,24 +1092,28 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, * holds a reference to the trace chunk. */ ERR("Failed to acquire reference to channel's trace chunk"); - ret = -1; - goto error_unlock; + return -1; } LTTNG_ASSERT(!stream->trace_chunk); stream->trace_chunk = channel->trace_chunk; stream->net_seq_idx = relayd_id; + /* Close stream output when were are done. */ + const auto close_stream_output = lttng::make_scope_exit( + [stream]() noexcept { consumer_stream_close_output(stream); }); + if (use_relayd) { ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { - goto error_close_stream; + return ret; } } else { ret = consumer_stream_create_output_files(stream, false); if (ret < 0) { - goto error_close_stream; + return ret; } + DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key); } @@ -1119,26 +1128,26 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, ", channel name = '%s'", channel->key, channel->name); - goto error_unlock; + return ret; } } ret = lttng_ustconsumer_take_snapshot(stream); if (ret < 0) { ERR("Taking UST snapshot"); - goto error_close_stream; + return ret; } ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos); if (ret < 0) { ERR("Produced UST snapshot position"); - goto error_close_stream; + return ret; } ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Consumerd UST snapshot position"); - goto error_close_stream; + return ret; } /* @@ -1164,29 +1173,37 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, if (ret < 0) { if (ret != -EAGAIN) { PERROR("lttng_ust_ctl_get_subbuf snapshot"); - goto error_close_stream; + return ret; } + DBG("UST consumer get subbuf failed. Skipping it."); consumed_pos += stream->max_sb_size; stream->chan->lost_packets++; continue; } + /* Put the subbuffer once we are done. */ + const auto put_subbuf = lttng::make_scope_exit([stream]() noexcept { + if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) { + ERR("Snapshot lttng_ust_ctl_put_subbuf"); + } + }); + ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &len); if (ret < 0) { ERR("Snapshot lttng_ust_ctl_get_subbuf_size"); - goto error_put_subbuf; + return ret; } ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, &padded_len); if (ret < 0) { ERR("Snapshot lttng_ust_ctl_get_padded_subbuf_size"); - goto error_put_subbuf; + return ret; } ret = get_current_subbuf_addr(stream, &subbuf_addr); if (ret) { - goto error_put_subbuf; + return ret; } subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len); @@ -1194,42 +1211,22 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { - ret = -EPERM; - goto error_put_subbuf; + return -EPERM; } } else { if (read_len != padded_len) { - ret = -EPERM; - goto error_put_subbuf; + return -EPERM; } } - ret = lttng_ust_ctl_put_subbuf(stream->ustream); - if (ret < 0) { - ERR("Snapshot lttng_ust_ctl_put_subbuf"); - goto error_close_stream; - } consumed_pos += stream->max_sb_size; } /* Simply close the stream so we can use it on the next snapshot. */ consumer_stream_close_output(stream); - pthread_mutex_unlock(&stream->lock); } - rcu_read_unlock(); return 0; - -error_put_subbuf: - if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) { - ERR("Snapshot lttng_ust_ctl_put_subbuf"); - } -error_close_stream: - consumer_stream_close_output(stream); -error_unlock: - pthread_mutex_unlock(&stream->lock); - rcu_read_unlock(); - return ret; } static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream) @@ -1254,7 +1251,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t len, uint64_t version, struct lttng_consumer_channel *channel, - int timer, + bool invoked_by_timer, int wait) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; @@ -1306,7 +1303,7 @@ int lttng_ustconsumer_recv_metadata(int sock, * channel is under a snapshot session type. No need to update * the stream position in that scenario. */ - if (channel->metadata_stream != NULL) { + if (channel->metadata_stream != nullptr) { pthread_mutex_lock(&channel->metadata_stream->lock); metadata_stream_reset_cache_consumed_position(channel->metadata_stream); pthread_mutex_unlock(&channel->metadata_stream->lock); @@ -1342,13 +1339,8 @@ int lttng_ustconsumer_recv_metadata(int sock, if (!wait) { goto end_free; } - while (consumer_metadata_cache_flushed(channel, offset + len, timer)) { - DBG("Waiting for metadata to be flushed"); - - health_code_update(); - usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); - } + consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer); end_free: free(metadata_str); @@ -1368,7 +1360,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int ret_func; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttcomm_consumer_msg msg; - struct lttng_consumer_channel *channel = NULL; + struct lttng_consumer_channel *channel = nullptr; health_code_update(); @@ -1400,14 +1392,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); /* relayd needs RCU read-side lock */ - rcu_read_lock(); + const lttng::urcu::read_lock_guard read_lock; switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { - uint32_t major = msg.u.relayd_sock.major; - uint32_t minor = msg.u.relayd_sock.minor; - enum lttcomm_sock_proto protocol = + const uint32_t major = msg.u.relayd_sock.major; + const uint32_t minor = msg.u.relayd_sock.minor; + const lttcomm_sock_proto protocol = (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol; /* Session daemon status message are handled in the following call. */ @@ -1425,14 +1417,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_DESTROY_RELAYD: { - uint64_t index = msg.u.destroy_relayd.net_seq_idx; + const uint64_t index = msg.u.destroy_relayd.net_seq_idx; struct consumer_relayd_sock_pair *relayd; DBG("UST consumer destroying relayd %" PRIu64, index); /* Get relayd reference if exists. */ relayd = consumer_find_relayd(index); - if (relayd == NULL) { + if (relayd == nullptr) { DBG("Unable to find relayd %" PRIu64, index); ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } @@ -1455,14 +1447,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_UPDATE_STREAM: { - rcu_read_unlock(); return -ENOSYS; } case LTTNG_CONSUMER_DATA_PENDING: { int is_data_pending; ssize_t ret_send; - uint64_t id = msg.u.data_pending.session_id; + const uint64_t id = msg.u.data_pending.session_id; DBG("UST consumer data pending command for id %" PRIu64, id); @@ -1495,7 +1486,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel = consumer_allocate_channel( msg.u.ask_channel.key, msg.u.ask_channel.session_id, - msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL, + msg.u.ask_channel.chunk_id.is_set ? &chunk_id : nullptr, msg.u.ask_channel.pathname, msg.u.ask_channel.name, msg.u.ask_channel.relayd_id, @@ -1635,7 +1626,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_GET_CHANNEL: { int ret, relayd_err = 0; - uint64_t key = msg.u.get_channel.key; + const uint64_t key = msg.u.get_channel.key; struct lttng_consumer_channel *found_channel; found_channel = consumer_find_channel(key); @@ -1695,7 +1686,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_DESTROY_CHANNEL: { - uint64_t key = msg.u.destroy_channel.key; + const uint64_t key = msg.u.destroy_channel.key; /* * Only called if streams have not been sent to stream @@ -1741,10 +1732,10 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_PUSH_METADATA: { int ret; - uint64_t len = msg.u.push_metadata.len; - uint64_t key = msg.u.push_metadata.key; - uint64_t offset = msg.u.push_metadata.target_offset; - uint64_t version = msg.u.push_metadata.version; + const uint64_t len = msg.u.push_metadata.len; + const uint64_t key = msg.u.push_metadata.key; + const uint64_t offset = msg.u.push_metadata.target_offset; + const uint64_t version = msg.u.push_metadata.version; struct lttng_consumer_channel *found_channel; DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); @@ -1794,7 +1785,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); ret = lttng_ustconsumer_recv_metadata( - sock, key, offset, len, version, found_channel, 0, 1); + sock, key, offset, len, version, found_channel, false, 1); if (ret < 0) { /* error receiving from sessiond */ goto error_push_metadata_fatal; @@ -1820,7 +1811,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_SNAPSHOT_CHANNEL: { struct lttng_consumer_channel *found_channel; - uint64_t key = msg.u.snapshot_channel.key; + const uint64_t key = msg.u.snapshot_channel.key; int ret_send; found_channel = consumer_find_channel(key); @@ -1869,41 +1860,36 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { int ret = 0; uint64_t discarded_events; - struct lttng_ht_iter iter; - struct lttng_ht *ht; - struct lttng_consumer_stream *stream; - uint64_t id = msg.u.discarded_events.session_id; - uint64_t key = msg.u.discarded_events.channel_key; + const auto id = msg.u.discarded_events.session_id; + const auto key = msg.u.discarded_events.channel_key; DBG("UST consumer discarded events command for session id %" PRIu64, id); - rcu_read_lock(); - pthread_mutex_lock(&the_consumer_data.lock); - - ht = the_consumer_data.stream_list_ht; - - /* - * We only need a reference to the channel, but they are not - * directly indexed, so we just use the first matching stream - * to extract the information we need, we default to 0 if not - * found (no events are dropped if the channel is not yet in - * use). - */ - discarded_events = 0; - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, - &id, - &iter.iter, - stream, - node_session_id.node) { - if (stream->chan->key == key) { - discarded_events = stream->chan->discarded_events; - break; + const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock); + const auto ht = the_consumer_data.stream_list_ht; + + /* + * We only need a reference to the channel, but they are not + * directly indexed, so we just use the first matching stream + * to extract the information we need, we default to 0 if not + * found (no events are dropped if the channel is not yet in + * use). + */ + discarded_events = 0; + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_session_id, + std::uint64_t>(*ht->ht, + &id, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct)) { + if (stream->chan->key == key) { + discarded_events = stream->chan->discarded_events; + break; + } } } - pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); DBG("UST consumer discarded events command for session id %" PRIu64 ", channel key %" PRIu64, @@ -1925,40 +1911,35 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { int ret; uint64_t lost_packets; - struct lttng_ht_iter iter; - struct lttng_ht *ht; - struct lttng_consumer_stream *stream; - uint64_t id = msg.u.lost_packets.session_id; - uint64_t key = msg.u.lost_packets.channel_key; + const auto id = msg.u.lost_packets.session_id; + const auto key = msg.u.lost_packets.channel_key; DBG("UST consumer lost packets command for session id %" PRIu64, id); - rcu_read_lock(); - pthread_mutex_lock(&the_consumer_data.lock); - - ht = the_consumer_data.stream_list_ht; - - /* - * We only need a reference to the channel, but they are not - * directly indexed, so we just use the first matching stream - * to extract the information we need, we default to 0 if not - * found (no packets lost if the channel is not yet in use). - */ - lost_packets = 0; - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, - &id, - &iter.iter, - stream, - node_session_id.node) { - if (stream->chan->key == key) { - lost_packets = stream->chan->lost_packets; - break; + const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock); + const auto ht = the_consumer_data.stream_list_ht; + + /* + * We only need a reference to the channel, but they are not + * directly indexed, so we just use the first matching stream + * to extract the information we need, we default to 0 if not + * found (no packets lost if the channel is not yet in use). + */ + lost_packets = 0; + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_session_id), + <tng_consumer_stream::node_session_id, + std::uint64_t>(*ht->ht, + &id, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct)) { + if (stream->chan->key == key) { + lost_packets = stream->chan->lost_packets; + break; + } } } - pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); DBG("UST consumer lost packets command for session id %" PRIu64 ", channel key %" PRIu64, @@ -2024,7 +2005,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ROTATE_CHANNEL: { struct lttng_consumer_channel *found_channel; - uint64_t key = msg.u.rotate_channel.key; + const uint64_t key = msg.u.rotate_channel.key; int ret_send_status; found_channel = consumer_find_channel(key); @@ -2077,7 +2058,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_CLEAR_CHANNEL: { struct lttng_consumer_channel *found_channel; - uint64_t key = msg.u.clear_channel.key; + const uint64_t key = msg.u.clear_channel.key; int ret_send_status; found_channel = consumer_find_channel(key); @@ -2131,8 +2112,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value; const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ? msg.u.create_trace_chunk.override_name : - NULL; - struct lttng_directory_handle *chunk_directory_handle = NULL; + nullptr; + struct lttng_directory_handle *chunk_directory_handle = nullptr; /* * The session daemon will only provide a chunk directory file @@ -2172,12 +2153,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } ret_code = lttng_consumer_create_trace_chunk( - !is_local_trace ? &relayd_id : NULL, + !is_local_trace ? &relayd_id : nullptr, msg.u.create_trace_chunk.session_id, msg.u.create_trace_chunk.chunk_id, (time_t) msg.u.create_trace_chunk.creation_timestamp, chunk_override_name, - msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL, + msg.u.create_trace_chunk.credentials.is_set ? &credentials : nullptr, chunk_directory_handle); lttng_directory_handle_put(chunk_directory_handle); goto end_msg_sessiond; @@ -2192,11 +2173,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int ret; ret_code = lttng_consumer_close_trace_chunk( - msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : NULL, + msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : nullptr, msg.u.close_trace_chunk.session_id, msg.u.close_trace_chunk.chunk_id, (time_t) msg.u.close_trace_chunk.close_timestamp, - msg.u.close_trace_chunk.close_command.is_set ? &close_command : NULL, + msg.u.close_trace_chunk.close_command.is_set ? &close_command : nullptr, closed_trace_chunk_path); reply.ret_code = ret_code; reply.path_length = strlen(closed_trace_chunk_path) + 1; @@ -2215,7 +2196,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value; ret_code = lttng_consumer_trace_chunk_exists( - msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : NULL, + msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : nullptr, msg.u.trace_chunk_exists.session_id, msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; @@ -2279,7 +2260,7 @@ end_channel_error: { int ret_send_status; - ret_send_status = consumer_send_status_channel(sock, NULL); + ret_send_status = consumer_send_status_channel(sock, nullptr); if (ret_send_status < 0) { /* Stop everything if session daemon can not be notified. */ goto error_fatal; @@ -2295,7 +2276,6 @@ error_fatal: goto end; end: - rcu_read_unlock(); health_code_update(); return ret_func; } @@ -2559,7 +2539,9 @@ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream) ret = write_len; goto end; } + stream->ust_metadata_pushed += write_len; + stream->chan->metadata_pushed_wait_queue.wake_all(); LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed); ret = write_len; @@ -2608,7 +2590,7 @@ lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0); pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { status = SYNC_METADATA_STATUS_ERROR; @@ -2864,7 +2846,7 @@ static int get_next_subbuffer_common(struct lttng_consumer_stream *stream, subbuffer->buffer.buffer = lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size); - LTTNG_ASSERT(subbuffer->buffer.buffer.data != NULL); + LTTNG_ASSERT(subbuffer->buffer.buffer.data != nullptr); end: return ret; } @@ -3204,23 +3186,23 @@ end: */ void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht) { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - LTTNG_ASSERT(metadata_ht); LTTNG_ASSERT(metadata_ht->ht); DBG("UST consumer closing all metadata streams"); - rcu_read_lock(); - cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*metadata_ht->ht)) { health_code_update(); pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->lock); lttng_ustconsumer_close_metadata(stream->chan); + pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); } - rcu_read_unlock(); } void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) @@ -3245,16 +3227,17 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, - int timer, + bool invoked_by_timer, int wait) { struct lttcomm_metadata_request_msg request; struct lttcomm_consumer_msg msg; - enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; + const lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; uint64_t len, key, offset, version; int ret; LTTNG_ASSERT(channel); + LTTNG_ASSERT(!channel->is_deleted); LTTNG_ASSERT(channel->metadata_cache); memset(&request, 0, sizeof(request)); @@ -3350,8 +3333,14 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); - ret = lttng_ustconsumer_recv_metadata( - ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait); + ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, + key, + offset, + len, + version, + channel, + invoked_by_timer, + wait); if (ret >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive