#include <common/defaults.hpp>
#include <common/fs-handle.hpp>
#include <common/sessiond-comm/relayd.hpp>
+#include <common/urcu.hpp>
#include <common/utils.hpp>
#include <algorithm>
struct lttng_ht_iter iter;
struct relay_stream *stream = nullptr;
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (!node) {
DBG("Relay stream %" PRIu64 " not found", stream_id);
goto end;
stream = nullptr;
}
end:
- rcu_read_unlock();
return stream;
}
*/
static int try_rotate_stream_index(struct relay_stream *stream)
{
- int ret = 0;
+ const int ret = 0;
if (!stream->ongoing_rotation.is_set) {
/* No rotation expected. */
void stream_put(struct relay_stream *stream)
{
- rcu_read_lock();
+ const lttng::urcu::read_lock_guard read_lock;
LTTNG_ASSERT(stream->ref.refcount != 0);
/*
* Wait until we have processed all the stream packets before
* actually putting our last stream reference.
*/
urcu_ref_put(&stream->ref, stream_release);
- rcu_read_unlock();
}
int stream_set_pending_rotation(struct relay_stream *stream,
recv_len = packet ? packet->size : 0;
recv_len += padding_len;
stream->metadata_received += recv_len;
- if (recv_len) {
- stream->no_new_metadata_notified = false;
- }
}
DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
static void print_stream_indexes(struct relay_stream *stream)
{
- struct lttng_ht_iter iter;
- struct relay_index *index;
-
- rcu_read_lock();
- cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) {
+ for (auto *index :
+ lttng::urcu::lfht_iteration_adapter<relay_index,
+ decltype(relay_index::index_n),
+ &relay_index::index_n>(*stream->indexes_ht->ht)) {
DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
" stream %" PRIu64 " trace %" PRIu64 " session %" PRIu64,
index,
index->stream->trace->id,
index->stream->trace->session->id);
}
- rcu_read_unlock();
}
int stream_reset_file(struct relay_stream *stream)
void print_relay_streams()
{
- struct lttng_ht_iter iter;
- struct relay_stream *stream;
-
if (!relay_streams_ht) {
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<relay_stream,
+ decltype(relay_stream::node),
+ &relay_stream::node>(*relay_streams_ht->ht)) {
if (!stream_get(stream)) {
continue;
}
+
DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 " session %" PRIu64,
stream,
stream->ref.refcount,
print_stream_indexes(stream);
stream_put(stream);
}
- rcu_read_unlock();
}