if (key < 0)
return NULL;
+ rcu_read_lock();
+
lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
stream = caa_container_of(node, struct lttng_consumer_stream, node);
}
+ rcu_read_unlock();
+
return stream;
}
if (key < 0)
return NULL;
+ rcu_read_lock();
+
lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
channel = caa_container_of(node, struct lttng_consumer_channel, node);
}
+ rcu_read_unlock();
+
return channel;
}
goto end;
}
+ rcu_read_lock();
+
/* Get stream node from hash table */
lttng_ht_lookup(consumer_data.stream_ht,
(void *)((unsigned long) stream->key), &iter);
ret = lttng_ht_del(consumer_data.stream_ht, &iter);
assert(!ret);
+ rcu_read_unlock();
+
if (consumer_data.stream_count <= 0) {
goto end;
}
consumer_del_channel(free_chan);
}
+static void consumer_del_stream_rcu(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_stream *stream =
+ caa_container_of(node, struct lttng_consumer_stream, node);
+
+ consumer_del_stream(stream);
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(
int channel_key, int stream_key,
int shm_fd, int wait_fd,
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
+ rcu_read_lock();
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
+ rcu_read_unlock();
consumer_data.stream_count++;
consumer_data.need_update = 1;
goto end;
}
+ rcu_read_lock();
+
lttng_ht_lookup(consumer_data.channel_ht,
(void *)((unsigned long) channel->key), &iter);
ret = lttng_ht_del(consumer_data.channel_ht, &iter);
assert(!ret);
+ rcu_read_unlock();
+
if (channel->mmap_base != NULL) {
ret = munmap(channel->mmap_base, channel->mmap_len);
if (ret != 0) {
pthread_mutex_unlock(&consumer_data.lock);
}
+static void consumer_del_channel_rcu(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_channel *channel=
+ caa_container_of(node, struct lttng_consumer_channel, node);
+
+ consumer_del_channel(channel);
+}
+
struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
int shm_fd, int wait_fd,
pthread_mutex_lock(&consumer_data.lock);
/* Steal channel identifier, for UST */
consumer_steal_channel_key(channel->key);
+ rcu_read_lock();
lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+ rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
return 0;
}
{
int ret;
struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
- struct lttng_consumer_channel *channel;
+ struct lttng_ht_node_ulong *node;
+
+ rcu_read_lock();
/*
- * close all outfd. Called when there are no more threads
- * running (after joining on the threads), no need to protect
- * list iteration with mutex.
+ * close all outfd. Called when there are no more threads running (after
+ * joining on the threads), no need to protect list iteration with mutex.
*/
- cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
- node.node) {
+ cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
+ node) {
ret = lttng_ht_del(consumer_data.stream_ht, &iter);
assert(!ret);
- consumer_del_stream(stream);
+ call_rcu(&node->head, consumer_del_stream_rcu);
}
- cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
- node.node) {
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
+ node) {
ret = lttng_ht_del(consumer_data.channel_ht, &iter);
assert(!ret);
- consumer_del_channel(channel);
+ call_rcu(&node->head, consumer_del_channel_rcu);
}
+
+ rcu_read_unlock();
}
/*
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
- consumer_del_stream(local_stream[i]);
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
num_hup++;
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
- consumer_del_stream(local_stream[i]);
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
num_hup++;
} else if ((pollfd[i].revents & POLLHUP) &&
!(pollfd[i].revents & POLLIN)) {
} else {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
}
- consumer_del_stream(local_stream[i]);
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
num_hup++;
}
}