X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Findex.cpp;h=426a34b2470e3b7acf0053bbba60fd58edc9b751;hb=28ab034a2c3582d07d3423d2d746731f87d3969f;hp=ef00a6eca103e1a3e53eb0e5470f00312b584aa0;hpb=ac497a37018f3c253d2e50397294f58d33f7f24f;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/index.cpp b/src/bin/lttng-relayd/index.cpp index ef00a6eca..426a34b24 100644 --- a/src/bin/lttng-relayd/index.cpp +++ b/src/bin/lttng-relayd/index.cpp @@ -9,14 +9,14 @@ #define _LGPL_SOURCE -#include -#include -#include +#include "connection.hpp" +#include "index.hpp" +#include "lttng-relayd.hpp" +#include "stream.hpp" -#include "lttng-relayd.h" -#include "stream.h" -#include "index.h" -#include "connection.h" +#include +#include +#include /* * Allocate a new relay index object. Pass the stream in which it is @@ -26,15 +26,15 @@ * Called with stream mutex held. * Return allocated object or else NULL on error. */ -static struct relay_index *relay_index_create(struct relay_stream *stream, - uint64_t net_seq_num) +static struct relay_index *relay_index_create(struct relay_stream *stream, uint64_t net_seq_num) { struct relay_index *index; DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64, - stream->stream_handle, net_seq_num); + stream->stream_handle, + net_seq_num); - index = (relay_index *) zmalloc(sizeof(*index)); + index = zmalloc(); if (!index) { PERROR("Relay index zmalloc"); goto end; @@ -62,21 +62,24 @@ end: * RCU read side lock MUST be acquired. */ static struct relay_index *relay_index_add_unique(struct relay_stream *stream, - struct relay_index *index) + struct relay_index *index) { struct cds_lfht_node *node_ptr; struct relay_index *_index; + ASSERT_RCU_READ_LOCKED(); + DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64, - stream->stream_handle, index->index_n.key); + stream->stream_handle, + index->index_n.key); node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht, - stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed), - stream->indexes_ht->match_fct, &index->index_n, - &index->index_n.node); + stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed), + stream->indexes_ht->match_fct, + &index->index_n, + &index->index_n.node); if (node_ptr != &index->index_n.node) { - _index = caa_container_of(node_ptr, struct relay_index, - index_n.node); + _index = caa_container_of(node_ptr, struct relay_index, index_n.node); } else { _index = NULL; } @@ -88,9 +91,12 @@ static struct relay_index *relay_index_add_unique(struct relay_stream *stream, */ static bool relay_index_get(struct relay_index *index) { + ASSERT_RCU_READ_LOCKED(); + DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", - index->stream->stream_handle, index->index_n.key, - (int) index->ref.refcount); + index->stream->stream_handle, + index->index_n.key, + (int) index->ref.refcount); return urcu_ref_get_unless_zero(&index->ref); } @@ -103,27 +109,29 @@ static bool relay_index_get(struct relay_index *index) * Return index object or else NULL on error. */ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, - uint64_t net_seq_num) + uint64_t net_seq_num) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; struct relay_index *index = NULL; DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64, - stream->stream_handle, net_seq_num); + stream->stream_handle, + net_seq_num); rcu_read_lock(); lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node) { - index = caa_container_of(node, struct relay_index, index_n); + index = lttng::utils::container_of(node, &relay_index::index_n); } else { struct relay_index *oldindex; index = relay_index_create(stream, net_seq_num); if (!index) { ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64, - stream->stream_handle, net_seq_num); + stream->stream_handle, + net_seq_num); goto end; } oldindex = relay_index_add_unique(stream, index); @@ -142,13 +150,15 @@ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, end: rcu_read_unlock(); DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64, - (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num); + (index == NULL) ? "NOT " : "", + stream->stream_handle, + net_seq_num); return index; } int relay_index_set_file(struct relay_index *index, - struct lttng_index_file *index_file, - uint64_t data_offset) + struct lttng_index_file *index_file, + uint64_t data_offset) { int ret = 0; @@ -165,8 +175,7 @@ end: return ret; } -int relay_index_set_data(struct relay_index *index, - const struct ctf_packet_index *data) +int relay_index_set_data(struct relay_index *index, const struct ctf_packet_index *data) { int ret = 0; @@ -195,8 +204,7 @@ static void index_destroy(struct relay_index *index) static void index_destroy_rcu(struct rcu_head *rcu_head) { - struct relay_index *index = - caa_container_of(rcu_head, struct relay_index, rcu_node); + struct relay_index *index = lttng::utils::container_of(rcu_head, &relay_index::rcu_node); index_destroy(index); } @@ -204,7 +212,7 @@ static void index_destroy_rcu(struct rcu_head *rcu_head) /* Stream lock must be held by the caller. */ static void index_release(struct urcu_ref *ref) { - struct relay_index *index = caa_container_of(ref, struct relay_index, ref); + struct relay_index *index = lttng::utils::container_of(ref, &relay_index::ref); struct relay_stream *stream = index->stream; int ret; struct lttng_ht_iter iter; @@ -235,10 +243,11 @@ static void index_release(struct urcu_ref *ref) void relay_index_put(struct relay_index *index) { DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", - index->stream->stream_handle, index->index_n.key, - (int) index->ref.refcount); + index->stream->stream_handle, + index->index_n.key, + (int) index->ref.refcount); /* - * Ensure existance of index->lock for index unlock. + * Ensure existence of index->lock for index unlock. */ rcu_read_lock(); /* @@ -273,7 +282,8 @@ int relay_index_try_flush(struct relay_index *index) } DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64, - index->stream->stream_handle, index->index_n.key); + index->stream->stream_handle, + index->index_n.key); flushed = true; index->flushed = true; ret = lttng_index_file_write(index->index_file, &index->index_data); @@ -297,8 +307,7 @@ void relay_index_close_all(struct relay_stream *stream) struct relay_index *index; rcu_read_lock(); - cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, - index, index_n.node) { + cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { /* Put self-ref from index. */ relay_index_put(index); } @@ -311,8 +320,7 @@ void relay_index_close_partial_fd(struct relay_stream *stream) struct relay_index *index; rcu_read_lock(); - cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, - index, index_n.node) { + cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { if (!index->index_file) { continue; } @@ -333,10 +341,8 @@ uint64_t relay_index_find_last(struct relay_stream *stream) uint64_t net_seq_num = -1ULL; rcu_read_lock(); - cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, - index, index_n.node) { - if (net_seq_num == -1ULL || - index->index_n.key > net_seq_num) { + cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { + if (net_seq_num == -1ULL || index->index_n.key > net_seq_num) { net_seq_num = index->index_n.key; } } @@ -348,10 +354,9 @@ uint64_t relay_index_find_last(struct relay_stream *stream) * Update the index file of an already existing relay_index. * Offsets by 'removed_data_count' the offset field of an index. */ -static -int relay_index_switch_file(struct relay_index *index, - struct lttng_index_file *new_index_file, - uint64_t removed_data_count) +static int relay_index_switch_file(struct relay_index *index, + struct lttng_index_file *new_index_file, + uint64_t removed_data_count) { int ret = 0; uint64_t offset; @@ -386,10 +391,9 @@ int relay_index_switch_all_files(struct relay_stream *stream) int ret = 0; rcu_read_lock(); - cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, - index, index_n.node) { - ret = relay_index_switch_file(index, stream->index_file, - stream->pos_after_last_complete_data_index); + cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { + ret = relay_index_switch_file( + index, stream->index_file, stream->pos_after_last_complete_data_index); if (ret) { goto end; } @@ -403,11 +407,11 @@ end: * Set index data from the control port to a given index object. */ int relay_index_set_control_data(struct relay_index *index, - const struct lttcomm_relayd_index *data, - unsigned int minor_version) + const struct lttcomm_relayd_index *data, + unsigned int minor_version) { /* The index on disk is encoded in big endian. */ - ctf_packet_index index_data {}; + ctf_packet_index index_data{}; index_data.packet_size = htobe64(data->packet_size); index_data.content_size = htobe64(data->content_size);