Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / bin / lttng-relayd / index.cpp
... / ...
CommitLineData
1/*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10#define _LGPL_SOURCE
11
12#include "connection.hpp"
13#include "index.hpp"
14#include "lttng-relayd.hpp"
15#include "stream.hpp"
16
17#include <common/common.hpp>
18#include <common/compat/endian.hpp>
19#include <common/urcu.hpp>
20#include <common/utils.hpp>
21
22/*
23 * Allocate a new relay index object. Pass the stream in which it is
24 * contained as parameter. The sequence number will be used as the hash
25 * table key.
26 *
27 * Called with stream mutex held.
28 * Return allocated object or else NULL on error.
29 */
30static struct relay_index *relay_index_create(struct relay_stream *stream, uint64_t net_seq_num)
31{
32 struct relay_index *index;
33
34 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
35 stream->stream_handle,
36 net_seq_num);
37
38 index = zmalloc<relay_index>();
39 if (!index) {
40 PERROR("Relay index zmalloc");
41 goto end;
42 }
43 if (!stream_get(stream)) {
44 ERR("Cannot get stream");
45 free(index);
46 index = nullptr;
47 goto end;
48 }
49 index->stream = stream;
50
51 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
52 pthread_mutex_init(&index->lock, nullptr);
53 urcu_ref_init(&index->ref);
54
55end:
56 return index;
57}
58
59/*
60 * Add unique relay index to the given hash table. In case of a collision, the
61 * already existing object is put in the given _index variable.
62 *
63 * RCU read side lock MUST be acquired.
64 */
65static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
66 struct relay_index *index)
67{
68 struct cds_lfht_node *node_ptr;
69 struct relay_index *_index;
70
71 ASSERT_RCU_READ_LOCKED();
72
73 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
74 stream->stream_handle,
75 index->index_n.key);
76
77 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
78 stream->indexes_ht->hash_fct(&index->index_n.key,
79 lttng_ht_seed),
80 stream->indexes_ht->match_fct,
81 &index->index_n.key,
82 &index->index_n.node);
83 if (node_ptr != &index->index_n.node) {
84 _index = lttng_ht_node_container_of(node_ptr, &relay_index::index_n);
85 } else {
86 _index = nullptr;
87 }
88 return _index;
89}
90
91/*
92 * Should be called with RCU read-side lock held.
93 */
94static bool relay_index_get(struct relay_index *index)
95{
96 ASSERT_RCU_READ_LOCKED();
97
98 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
99 index->stream->stream_handle,
100 index->index_n.key,
101 (int) index->ref.refcount);
102
103 return urcu_ref_get_unless_zero(&index->ref);
104}
105
106/*
107 * Get a relayd index in within the given stream, or create it if not
108 * present.
109 *
110 * Called with stream mutex held.
111 * Return index object or else NULL on error.
112 */
113struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
114 uint64_t net_seq_num)
115{
116 struct lttng_ht_node_u64 *node;
117 struct lttng_ht_iter iter;
118 struct relay_index *index = nullptr;
119
120 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
121 stream->stream_handle,
122 net_seq_num);
123
124 const lttng::urcu::read_lock_guard read_lock;
125 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
126 node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
127 if (node) {
128 index = lttng::utils::container_of(node, &relay_index::index_n);
129 } else {
130 struct relay_index *oldindex;
131
132 index = relay_index_create(stream, net_seq_num);
133 if (!index) {
134 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
135 stream->stream_handle,
136 net_seq_num);
137 goto end;
138 }
139 oldindex = relay_index_add_unique(stream, index);
140 if (oldindex) {
141 /* Added concurrently, keep old. */
142 relay_index_put(index);
143 index = oldindex;
144 if (!relay_index_get(index)) {
145 index = nullptr;
146 }
147 } else {
148 stream->indexes_in_flight++;
149 index->in_hash_table = true;
150 }
151 }
152end:
153 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
154 (index == NULL) ? "NOT " : "",
155 stream->stream_handle,
156 net_seq_num);
157 return index;
158}
159
160int relay_index_set_file(struct relay_index *index,
161 struct lttng_index_file *index_file,
162 uint64_t data_offset)
163{
164 int ret = 0;
165
166 pthread_mutex_lock(&index->lock);
167 if (index->index_file) {
168 ret = -1;
169 goto end;
170 }
171 lttng_index_file_get(index_file);
172 index->index_file = index_file;
173 index->index_data.offset = data_offset;
174end:
175 pthread_mutex_unlock(&index->lock);
176 return ret;
177}
178
179int relay_index_set_data(struct relay_index *index, const struct ctf_packet_index *data)
180{
181 int ret = 0;
182
183 pthread_mutex_lock(&index->lock);
184 if (index->has_index_data) {
185 ret = -1;
186 goto end;
187 }
188 /* Set everything except data_offset. */
189 index->index_data.packet_size = data->packet_size;
190 index->index_data.content_size = data->content_size;
191 index->index_data.timestamp_begin = data->timestamp_begin;
192 index->index_data.timestamp_end = data->timestamp_end;
193 index->index_data.events_discarded = data->events_discarded;
194 index->index_data.stream_id = data->stream_id;
195 index->has_index_data = true;
196end:
197 pthread_mutex_unlock(&index->lock);
198 return ret;
199}
200
201static void index_destroy(struct relay_index *index)
202{
203 free(index);
204}
205
206static void index_destroy_rcu(struct rcu_head *rcu_head)
207{
208 struct relay_index *index = lttng::utils::container_of(rcu_head, &relay_index::rcu_node);
209
210 index_destroy(index);
211}
212
213/* Stream lock must be held by the caller. */
214static void index_release(struct urcu_ref *ref)
215{
216 struct relay_index *index = lttng::utils::container_of(ref, &relay_index::ref);
217 struct relay_stream *stream = index->stream;
218 int ret;
219 struct lttng_ht_iter iter;
220
221 if (index->index_file) {
222 lttng_index_file_put(index->index_file);
223 index->index_file = nullptr;
224 }
225 if (index->in_hash_table) {
226 /* Delete index from hash table. */
227 iter.iter.node = &index->index_n.node;
228 ret = lttng_ht_del(stream->indexes_ht, &iter);
229 LTTNG_ASSERT(!ret);
230 stream->indexes_in_flight--;
231 }
232
233 stream_put(index->stream);
234 index->stream = nullptr;
235
236 call_rcu(&index->rcu_node, index_destroy_rcu);
237}
238
239/*
240 * Called with stream mutex held.
241 *
242 * Stream lock must be held by the caller.
243 */
244void relay_index_put(struct relay_index *index)
245{
246 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
247 index->stream->stream_handle,
248 index->index_n.key,
249 (int) index->ref.refcount);
250 /*
251 * Ensure existence of index->lock for index unlock.
252 */
253 const lttng::urcu::read_lock_guard read_lock;
254 /*
255 * Index lock ensures that concurrent test and update of stream
256 * ref is atomic.
257 */
258 LTTNG_ASSERT(index->ref.refcount != 0);
259 urcu_ref_put(&index->ref, index_release);
260}
261
262/*
263 * Try to flush index to disk. Releases self-reference to index once
264 * flush succeeds.
265 *
266 * Stream lock must be held by the caller.
267 * Return 0 on successful flush, a negative value on error, or positive
268 * value if no flush was performed.
269 */
270int relay_index_try_flush(struct relay_index *index)
271{
272 int ret = 1;
273 bool flushed = false;
274
275 pthread_mutex_lock(&index->lock);
276 if (index->flushed) {
277 goto skip;
278 }
279 /* Check if we are ready to flush. */
280 if (!index->has_index_data || !index->index_file) {
281 goto skip;
282 }
283
284 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
285 index->stream->stream_handle,
286 index->index_n.key);
287 flushed = true;
288 index->flushed = true;
289 ret = lttng_index_file_write(index->index_file, &index->index_data);
290skip:
291 pthread_mutex_unlock(&index->lock);
292
293 if (flushed) {
294 /* Put self-ref from index now that it has been flushed. */
295 relay_index_put(index);
296 }
297 return ret;
298}
299
300/*
301 * Close every relay index within a given stream, without flushing
302 * them.
303 */
304void relay_index_close_all(struct relay_stream *stream)
305{
306 for (auto *index :
307 lttng::urcu::lfht_iteration_adapter<relay_index,
308 decltype(relay_index::index_n),
309 &relay_index::index_n>(*stream->indexes_ht->ht)) {
310 /* Put self-ref from index. */
311 relay_index_put(index);
312 }
313}
314
315void relay_index_close_partial_fd(struct relay_stream *stream)
316{
317 for (auto *index :
318 lttng::urcu::lfht_iteration_adapter<relay_index,
319 decltype(relay_index::index_n),
320 &relay_index::index_n>(*stream->indexes_ht->ht)) {
321 if (!index->index_file) {
322 continue;
323 }
324 /*
325 * Partial index has its index_file: we have only
326 * received its info from the data socket.
327 * Put self-ref from index.
328 */
329 relay_index_put(index);
330 }
331}
332
333uint64_t relay_index_find_last(struct relay_stream *stream)
334{
335 uint64_t net_seq_num = -1ULL;
336
337 for (auto *index :
338 lttng::urcu::lfht_iteration_adapter<relay_index,
339 decltype(relay_index::index_n),
340 &relay_index::index_n>(*stream->indexes_ht->ht)) {
341 if (net_seq_num == -1ULL || index->index_n.key > net_seq_num) {
342 net_seq_num = index->index_n.key;
343 }
344 }
345
346 return net_seq_num;
347}
348
349/*
350 * Update the index file of an already existing relay_index.
351 * Offsets by 'removed_data_count' the offset field of an index.
352 */
353static int relay_index_switch_file(struct relay_index *index,
354 struct lttng_index_file *new_index_file,
355 uint64_t removed_data_count)
356{
357 int ret = 0;
358 uint64_t offset;
359
360 pthread_mutex_lock(&index->lock);
361 if (!index->index_file) {
362 ERR("No index_file");
363 ret = 0;
364 goto end;
365 }
366
367 lttng_index_file_put(index->index_file);
368 lttng_index_file_get(new_index_file);
369 index->index_file = new_index_file;
370 offset = be64toh(index->index_data.offset);
371 index->index_data.offset = htobe64(offset - removed_data_count);
372
373end:
374 pthread_mutex_unlock(&index->lock);
375 return ret;
376}
377
378/*
379 * Switch the index file of all pending indexes for a stream and update the
380 * data offset by substracting the last safe position.
381 * Stream lock must be held.
382 */
383int relay_index_switch_all_files(struct relay_stream *stream)
384{
385 for (auto *index :
386 lttng::urcu::lfht_iteration_adapter<relay_index,
387 decltype(relay_index::index_n),
388 &relay_index::index_n>(*stream->indexes_ht->ht)) {
389 const auto ret = relay_index_switch_file(
390 index, stream->index_file, stream->pos_after_last_complete_data_index);
391 if (ret) {
392 return ret;
393 }
394 }
395
396 return 0;
397}
398
399/*
400 * Set index data from the control port to a given index object.
401 */
402int relay_index_set_control_data(struct relay_index *index,
403 const struct lttcomm_relayd_index *data,
404 unsigned int minor_version)
405{
406 /* The index on disk is encoded in big endian. */
407 ctf_packet_index index_data{};
408
409 index_data.packet_size = htobe64(data->packet_size);
410 index_data.content_size = htobe64(data->content_size);
411 index_data.timestamp_begin = htobe64(data->timestamp_begin);
412 index_data.timestamp_end = htobe64(data->timestamp_end);
413 index_data.events_discarded = htobe64(data->events_discarded);
414 index_data.stream_id = htobe64(data->stream_id);
415
416 if (minor_version >= 8) {
417 index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
418 index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
419 } else {
420 const uint64_t unset_value = -1ULL;
421
422 index->index_data.stream_instance_id = htobe64(unset_value);
423 index->index_data.packet_seq_num = htobe64(unset_value);
424 }
425
426 return relay_index_set_data(index, &index_data);
427}
This page took 0.025151 seconds and 5 git commands to generate.