struct lttng_ht_node_u64 *node;
struct lttng_consumer_channel *channel = NULL;
+ ASSERT_RCU_READ_LOCKED();
+
/* -1ULL keys are lookup failures */
if (key == (uint64_t) -1ULL) {
return NULL;
void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
{
LTTNG_ASSERT(relayd);
+ ASSERT_RCU_READ_LOCKED();
/* Set destroy flag for this object */
uatomic_set(&relayd->destroy_flag, 1);
struct lttng_ht_iter iter;
LTTNG_ASSERT(relayd);
+ ASSERT_RCU_READ_LOCKED();
lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx,
&iter);
struct lttng_ht_node_u64 *node;
struct consumer_relayd_sock_pair *relayd = NULL;
+ ASSERT_RCU_READ_LOCKED();
+
/* Negative keys are lookup failures */
if (key == (uint64_t) -1ULL) {
goto error;
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(relayd_sock);
+ ASSERT_RCU_READ_LOCKED();
DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
struct lttng_ht_iter iter;
struct consumer_relayd_sock_pair *relayd = NULL;
+ ASSERT_RCU_READ_LOCKED();
+
/* Iterate over all relayd since they are indexed by net_seq_idx. */
cds_lfht_for_each_entry(the_consumer_data.relayd_ht->ht, &iter.iter,
relayd, node.node) {
struct lttng_dynamic_pointer_array streams_packet_to_open;
size_t stream_idx;
+ ASSERT_RCU_READ_LOCKED();
+
DBG("Consumer sample rotate position for channel %" PRIu64, key);
lttng_dynamic_array_init(&stream_rotation_positions,
struct lttng_ht_iter iter;
struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ ASSERT_RCU_READ_LOCKED();
+
rcu_read_lock();
DBG("Consumer rotate ready streams in channel %" PRIu64, key);