2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 * SPDX-License-Identifier: GPL-2.0-only
12 #include "connection.hpp"
14 #include "lttng-relayd.hpp"
17 #include <common/common.hpp>
18 #include <common/compat/endian.hpp>
19 #include <common/urcu.hpp>
20 #include <common/utils.hpp>
23 * Allocate a new relay index object. Pass the stream in which it is
24 * contained as parameter. The sequence number will be used as the hash
27 * Called with stream mutex held.
28 * Return allocated object or else NULL on error.
30 static struct relay_index
*relay_index_create(struct relay_stream
*stream
, uint64_t net_seq_num
)
32 struct relay_index
*index
;
34 DBG2("Creating relay index for stream id %" PRIu64
" and seqnum %" PRIu64
,
35 stream
->stream_handle
,
38 index
= zmalloc
<relay_index
>();
40 PERROR("Relay index zmalloc");
43 if (!stream_get(stream
)) {
44 ERR("Cannot get stream");
49 index
->stream
= stream
;
51 lttng_ht_node_init_u64(&index
->index_n
, net_seq_num
);
52 pthread_mutex_init(&index
->lock
, nullptr);
53 urcu_ref_init(&index
->ref
);
60 * Add unique relay index to the given hash table. In case of a collision, the
61 * already existing object is put in the given _index variable.
63 * RCU read side lock MUST be acquired.
65 static struct relay_index
*relay_index_add_unique(struct relay_stream
*stream
,
66 struct relay_index
*index
)
68 struct cds_lfht_node
*node_ptr
;
69 struct relay_index
*_index
;
71 ASSERT_RCU_READ_LOCKED();
73 DBG2("Adding relay index with stream id %" PRIu64
" and seqnum %" PRIu64
,
74 stream
->stream_handle
,
77 node_ptr
= cds_lfht_add_unique(stream
->indexes_ht
->ht
,
78 stream
->indexes_ht
->hash_fct(&index
->index_n
.key
,
80 stream
->indexes_ht
->match_fct
,
82 &index
->index_n
.node
);
83 if (node_ptr
!= &index
->index_n
.node
) {
84 _index
= lttng_ht_node_container_of(node_ptr
, &relay_index::index_n
);
92 * Should be called with RCU read-side lock held.
94 static bool relay_index_get(struct relay_index
*index
)
96 ASSERT_RCU_READ_LOCKED();
98 DBG2("index get for stream id %" PRIu64
" and seqnum %" PRIu64
" refcount %d",
99 index
->stream
->stream_handle
,
101 (int) index
->ref
.refcount
);
103 return urcu_ref_get_unless_zero(&index
->ref
);
107 * Get a relayd index in within the given stream, or create it if not
110 * Called with stream mutex held.
111 * Return index object or else NULL on error.
113 struct relay_index
*relay_index_get_by_id_or_create(struct relay_stream
*stream
,
114 uint64_t net_seq_num
)
116 struct lttng_ht_node_u64
*node
;
117 struct lttng_ht_iter iter
;
118 struct relay_index
*index
= nullptr;
120 DBG3("Finding index for stream id %" PRIu64
" and seq_num %" PRIu64
,
121 stream
->stream_handle
,
124 const lttng::urcu::read_lock_guard read_lock
;
125 lttng_ht_lookup(stream
->indexes_ht
, &net_seq_num
, &iter
);
126 node
= lttng_ht_iter_get_node
<lttng_ht_node_u64
>(&iter
);
128 index
= lttng::utils::container_of(node
, &relay_index::index_n
);
130 struct relay_index
*oldindex
;
132 index
= relay_index_create(stream
, net_seq_num
);
134 ERR("Cannot create index for stream id %" PRIu64
" and seq_num %" PRIu64
,
135 stream
->stream_handle
,
139 oldindex
= relay_index_add_unique(stream
, index
);
141 /* Added concurrently, keep old. */
142 relay_index_put(index
);
144 if (!relay_index_get(index
)) {
148 stream
->indexes_in_flight
++;
149 index
->in_hash_table
= true;
153 DBG2("Index %sfound or created in HT for stream ID %" PRIu64
" and seqnum %" PRIu64
,
154 (index
== NULL
) ? "NOT " : "",
155 stream
->stream_handle
,
160 int relay_index_set_file(struct relay_index
*index
,
161 struct lttng_index_file
*index_file
,
162 uint64_t data_offset
)
166 pthread_mutex_lock(&index
->lock
);
167 if (index
->index_file
) {
171 lttng_index_file_get(index_file
);
172 index
->index_file
= index_file
;
173 index
->index_data
.offset
= data_offset
;
175 pthread_mutex_unlock(&index
->lock
);
179 int relay_index_set_data(struct relay_index
*index
, const struct ctf_packet_index
*data
)
183 pthread_mutex_lock(&index
->lock
);
184 if (index
->has_index_data
) {
188 /* Set everything except data_offset. */
189 index
->index_data
.packet_size
= data
->packet_size
;
190 index
->index_data
.content_size
= data
->content_size
;
191 index
->index_data
.timestamp_begin
= data
->timestamp_begin
;
192 index
->index_data
.timestamp_end
= data
->timestamp_end
;
193 index
->index_data
.events_discarded
= data
->events_discarded
;
194 index
->index_data
.stream_id
= data
->stream_id
;
195 index
->has_index_data
= true;
197 pthread_mutex_unlock(&index
->lock
);
201 static void index_destroy(struct relay_index
*index
)
206 static void index_destroy_rcu(struct rcu_head
*rcu_head
)
208 struct relay_index
*index
= lttng::utils::container_of(rcu_head
, &relay_index::rcu_node
);
210 index_destroy(index
);
213 /* Stream lock must be held by the caller. */
214 static void index_release(struct urcu_ref
*ref
)
216 struct relay_index
*index
= lttng::utils::container_of(ref
, &relay_index::ref
);
217 struct relay_stream
*stream
= index
->stream
;
219 struct lttng_ht_iter iter
;
221 if (index
->index_file
) {
222 lttng_index_file_put(index
->index_file
);
223 index
->index_file
= nullptr;
225 if (index
->in_hash_table
) {
226 /* Delete index from hash table. */
227 iter
.iter
.node
= &index
->index_n
.node
;
228 ret
= lttng_ht_del(stream
->indexes_ht
, &iter
);
230 stream
->indexes_in_flight
--;
233 stream_put(index
->stream
);
234 index
->stream
= nullptr;
236 call_rcu(&index
->rcu_node
, index_destroy_rcu
);
240 * Called with stream mutex held.
242 * Stream lock must be held by the caller.
244 void relay_index_put(struct relay_index
*index
)
246 DBG2("index put for stream id %" PRIu64
" and seqnum %" PRIu64
" refcount %d",
247 index
->stream
->stream_handle
,
249 (int) index
->ref
.refcount
);
251 * Ensure existence of index->lock for index unlock.
253 const lttng::urcu::read_lock_guard read_lock
;
255 * Index lock ensures that concurrent test and update of stream
258 LTTNG_ASSERT(index
->ref
.refcount
!= 0);
259 urcu_ref_put(&index
->ref
, index_release
);
263 * Try to flush index to disk. Releases self-reference to index once
266 * Stream lock must be held by the caller.
267 * Return 0 on successful flush, a negative value on error, or positive
268 * value if no flush was performed.
270 int relay_index_try_flush(struct relay_index
*index
)
273 bool flushed
= false;
275 pthread_mutex_lock(&index
->lock
);
276 if (index
->flushed
) {
279 /* Check if we are ready to flush. */
280 if (!index
->has_index_data
|| !index
->index_file
) {
284 DBG2("Writing index for stream ID %" PRIu64
" and seq num %" PRIu64
,
285 index
->stream
->stream_handle
,
288 index
->flushed
= true;
289 ret
= lttng_index_file_write(index
->index_file
, &index
->index_data
);
291 pthread_mutex_unlock(&index
->lock
);
294 /* Put self-ref from index now that it has been flushed. */
295 relay_index_put(index
);
301 * Close every relay index within a given stream, without flushing
304 void relay_index_close_all(struct relay_stream
*stream
)
307 lttng::urcu::lfht_iteration_adapter
<relay_index
,
308 decltype(relay_index::index_n
),
309 &relay_index::index_n
>(*stream
->indexes_ht
->ht
)) {
310 /* Put self-ref from index. */
311 relay_index_put(index
);
315 void relay_index_close_partial_fd(struct relay_stream
*stream
)
318 lttng::urcu::lfht_iteration_adapter
<relay_index
,
319 decltype(relay_index::index_n
),
320 &relay_index::index_n
>(*stream
->indexes_ht
->ht
)) {
321 if (!index
->index_file
) {
325 * Partial index has its index_file: we have only
326 * received its info from the data socket.
327 * Put self-ref from index.
329 relay_index_put(index
);
333 uint64_t relay_index_find_last(struct relay_stream
*stream
)
335 uint64_t net_seq_num
= -1ULL;
338 lttng::urcu::lfht_iteration_adapter
<relay_index
,
339 decltype(relay_index::index_n
),
340 &relay_index::index_n
>(*stream
->indexes_ht
->ht
)) {
341 if (net_seq_num
== -1ULL || index
->index_n
.key
> net_seq_num
) {
342 net_seq_num
= index
->index_n
.key
;
350 * Update the index file of an already existing relay_index.
351 * Offsets by 'removed_data_count' the offset field of an index.
353 static int relay_index_switch_file(struct relay_index
*index
,
354 struct lttng_index_file
*new_index_file
,
355 uint64_t removed_data_count
)
360 pthread_mutex_lock(&index
->lock
);
361 if (!index
->index_file
) {
362 ERR("No index_file");
367 lttng_index_file_put(index
->index_file
);
368 lttng_index_file_get(new_index_file
);
369 index
->index_file
= new_index_file
;
370 offset
= be64toh(index
->index_data
.offset
);
371 index
->index_data
.offset
= htobe64(offset
- removed_data_count
);
374 pthread_mutex_unlock(&index
->lock
);
379 * Switch the index file of all pending indexes for a stream and update the
380 * data offset by substracting the last safe position.
381 * Stream lock must be held.
383 int relay_index_switch_all_files(struct relay_stream
*stream
)
386 lttng::urcu::lfht_iteration_adapter
<relay_index
,
387 decltype(relay_index::index_n
),
388 &relay_index::index_n
>(*stream
->indexes_ht
->ht
)) {
389 const auto ret
= relay_index_switch_file(
390 index
, stream
->index_file
, stream
->pos_after_last_complete_data_index
);
400 * Set index data from the control port to a given index object.
402 int relay_index_set_control_data(struct relay_index
*index
,
403 const struct lttcomm_relayd_index
*data
,
404 unsigned int minor_version
)
406 /* The index on disk is encoded in big endian. */
407 ctf_packet_index index_data
{};
409 index_data
.packet_size
= htobe64(data
->packet_size
);
410 index_data
.content_size
= htobe64(data
->content_size
);
411 index_data
.timestamp_begin
= htobe64(data
->timestamp_begin
);
412 index_data
.timestamp_end
= htobe64(data
->timestamp_end
);
413 index_data
.events_discarded
= htobe64(data
->events_discarded
);
414 index_data
.stream_id
= htobe64(data
->stream_id
);
416 if (minor_version
>= 8) {
417 index
->index_data
.stream_instance_id
= htobe64(data
->stream_instance_id
);
418 index
->index_data
.packet_seq_num
= htobe64(data
->packet_seq_num
);
420 const uint64_t unset_value
= -1ULL;
422 index
->index_data
.stream_instance_id
= htobe64(unset_value
);
423 index
->index_data
.packet_seq_num
= htobe64(unset_value
);
426 return relay_index_set_data(index
, &index_data
);