2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 #include <common/common.h>
24 #include <common/utils.h>
25 #include <common/compat/endian.h>
27 #include "lttng-relayd.h"
30 #include "connection.h"
33 * Allocate a new relay index object. Pass the stream in which it is
34 * contained as parameter. The sequence number will be used as the hash
37 * Called with stream mutex held.
38 * Return allocated object or else NULL on error.
40 static struct relay_index
*relay_index_create(struct relay_stream
*stream
,
43 struct relay_index
*index
;
45 DBG2("Creating relay index for stream id %" PRIu64
" and seqnum %" PRIu64
,
46 stream
->stream_handle
, net_seq_num
);
48 index
= zmalloc(sizeof(*index
));
50 PERROR("Relay index zmalloc");
53 if (!stream_get(stream
)) {
54 ERR("Cannot get stream");
59 index
->stream
= stream
;
61 lttng_ht_node_init_u64(&index
->index_n
, net_seq_num
);
62 pthread_mutex_init(&index
->lock
, NULL
);
63 urcu_ref_init(&index
->ref
);
70 * Add unique relay index to the given hash table. In case of a collision, the
71 * already existing object is put in the given _index variable.
73 * RCU read side lock MUST be acquired.
75 static struct relay_index
*relay_index_add_unique(struct relay_stream
*stream
,
76 struct relay_index
*index
)
78 struct cds_lfht_node
*node_ptr
;
79 struct relay_index
*_index
;
81 DBG2("Adding relay index with stream id %" PRIu64
" and seqnum %" PRIu64
,
82 stream
->stream_handle
, index
->index_n
.key
);
84 node_ptr
= cds_lfht_add_unique(stream
->indexes_ht
->ht
,
85 stream
->indexes_ht
->hash_fct(&index
->index_n
, lttng_ht_seed
),
86 stream
->indexes_ht
->match_fct
, &index
->index_n
,
87 &index
->index_n
.node
);
88 if (node_ptr
!= &index
->index_n
.node
) {
89 _index
= caa_container_of(node_ptr
, struct relay_index
,
98 * Should be called with RCU read-side lock held.
100 static bool relay_index_get(struct relay_index
*index
)
102 DBG2("index get for stream id %" PRIu64
" and seqnum %" PRIu64
" refcount %d",
103 index
->stream
->stream_handle
, index
->index_n
.key
,
104 (int) index
->ref
.refcount
);
106 return urcu_ref_get_unless_zero(&index
->ref
);
110 * Get a relayd index in within the given stream, or create it if not
113 * Called with stream mutex held.
114 * Return index object or else NULL on error.
116 struct relay_index
*relay_index_get_by_id_or_create(struct relay_stream
*stream
,
117 uint64_t net_seq_num
)
119 struct lttng_ht_node_u64
*node
;
120 struct lttng_ht_iter iter
;
121 struct relay_index
*index
= NULL
;
123 DBG3("Finding index for stream id %" PRIu64
" and seq_num %" PRIu64
,
124 stream
->stream_handle
, net_seq_num
);
127 lttng_ht_lookup(stream
->indexes_ht
, &net_seq_num
, &iter
);
128 node
= lttng_ht_iter_get_node_u64(&iter
);
130 index
= caa_container_of(node
, struct relay_index
, index_n
);
132 struct relay_index
*oldindex
;
134 index
= relay_index_create(stream
, net_seq_num
);
136 ERR("Cannot create index for stream id %" PRIu64
" and seq_num %" PRIu64
,
137 stream
->stream_handle
, net_seq_num
);
140 oldindex
= relay_index_add_unique(stream
, index
);
142 /* Added concurrently, keep old. */
143 relay_index_put(index
);
145 if (!relay_index_get(index
)) {
149 stream
->indexes_in_flight
++;
150 index
->in_hash_table
= true;
155 DBG2("Index %sfound or created in HT for stream ID %" PRIu64
" and seqnum %" PRIu64
,
156 (index
== NULL
) ? "NOT " : "", stream
->stream_handle
, net_seq_num
);
160 int relay_index_set_file(struct relay_index
*index
,
161 struct lttng_index_file
*index_file
,
162 uint64_t data_offset
)
166 pthread_mutex_lock(&index
->lock
);
167 if (index
->index_file
) {
171 lttng_index_file_get(index_file
);
172 index
->index_file
= index_file
;
173 index
->index_data
.offset
= data_offset
;
175 pthread_mutex_unlock(&index
->lock
);
179 int relay_index_set_data(struct relay_index
*index
,
180 const struct ctf_packet_index
*data
)
184 pthread_mutex_lock(&index
->lock
);
185 if (index
->has_index_data
) {
189 /* Set everything except data_offset. */
190 index
->index_data
.packet_size
= data
->packet_size
;
191 index
->index_data
.content_size
= data
->content_size
;
192 index
->index_data
.timestamp_begin
= data
->timestamp_begin
;
193 index
->index_data
.timestamp_end
= data
->timestamp_end
;
194 index
->index_data
.events_discarded
= data
->events_discarded
;
195 index
->index_data
.stream_id
= data
->stream_id
;
196 index
->has_index_data
= true;
198 pthread_mutex_unlock(&index
->lock
);
202 static void index_destroy(struct relay_index
*index
)
207 static void index_destroy_rcu(struct rcu_head
*rcu_head
)
209 struct relay_index
*index
=
210 caa_container_of(rcu_head
, struct relay_index
, rcu_node
);
212 index_destroy(index
);
215 /* Stream lock must be held by the caller. */
216 static void index_release(struct urcu_ref
*ref
)
218 struct relay_index
*index
= caa_container_of(ref
, struct relay_index
, ref
);
219 struct relay_stream
*stream
= index
->stream
;
221 struct lttng_ht_iter iter
;
223 if (index
->index_file
) {
224 lttng_index_file_put(index
->index_file
);
225 index
->index_file
= NULL
;
227 if (index
->in_hash_table
) {
228 /* Delete index from hash table. */
229 iter
.iter
.node
= &index
->index_n
.node
;
230 ret
= lttng_ht_del(stream
->indexes_ht
, &iter
);
232 stream
->indexes_in_flight
--;
235 stream_put(index
->stream
);
236 index
->stream
= NULL
;
238 call_rcu(&index
->rcu_node
, index_destroy_rcu
);
242 * Called with stream mutex held.
244 * Stream lock must be held by the caller.
246 void relay_index_put(struct relay_index
*index
)
248 DBG2("index put for stream id %" PRIu64
" and seqnum %" PRIu64
" refcount %d",
249 index
->stream
->stream_handle
, index
->index_n
.key
,
250 (int) index
->ref
.refcount
);
252 * Ensure existance of index->lock for index unlock.
256 * Index lock ensures that concurrent test and update of stream
259 assert(index
->ref
.refcount
!= 0);
260 urcu_ref_put(&index
->ref
, index_release
);
265 * Try to flush index to disk. Releases self-reference to index once
268 * Stream lock must be held by the caller.
269 * Return 0 on successful flush, a negative value on error, or positive
270 * value if no flush was performed.
272 int relay_index_try_flush(struct relay_index
*index
)
275 bool flushed
= false;
278 pthread_mutex_lock(&index
->lock
);
279 if (index
->flushed
) {
282 /* Check if we are ready to flush. */
283 if (!index
->has_index_data
|| !index
->index_file
) {
286 fd
= index
->index_file
->fd
;
287 DBG2("Writing index for stream ID %" PRIu64
" and seq num %" PRIu64
288 " on fd %d", index
->stream
->stream_handle
,
289 index
->index_n
.key
, fd
);
291 index
->flushed
= true;
292 ret
= lttng_index_file_write(index
->index_file
, &index
->index_data
);
294 pthread_mutex_unlock(&index
->lock
);
297 /* Put self-ref from index now that it has been flushed. */
298 relay_index_put(index
);
304 * Close every relay index within a given stream, without flushing
307 void relay_index_close_all(struct relay_stream
*stream
)
309 struct lttng_ht_iter iter
;
310 struct relay_index
*index
;
313 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
314 index
, index_n
.node
) {
315 /* Put self-ref from index. */
316 relay_index_put(index
);
321 void relay_index_close_partial_fd(struct relay_stream
*stream
)
323 struct lttng_ht_iter iter
;
324 struct relay_index
*index
;
327 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
328 index
, index_n
.node
) {
329 if (!index
->index_file
) {
333 * Partial index has its index_file: we have only
334 * received its info from the data socket.
335 * Put self-ref from index.
337 relay_index_put(index
);
342 uint64_t relay_index_find_last(struct relay_stream
*stream
)
344 struct lttng_ht_iter iter
;
345 struct relay_index
*index
;
346 uint64_t net_seq_num
= -1ULL;
349 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
350 index
, index_n
.node
) {
351 if (net_seq_num
== -1ULL ||
352 index
->index_n
.key
> net_seq_num
) {
353 net_seq_num
= index
->index_n
.key
;
361 * Update the index file of an already existing relay_index.
362 * Offsets by 'removed_data_count' the offset field of an index.
365 int relay_index_switch_file(struct relay_index
*index
,
366 struct lttng_index_file
*new_index_file
,
367 uint64_t removed_data_count
)
372 pthread_mutex_lock(&index
->lock
);
373 if (!index
->index_file
) {
374 ERR("No index_file");
379 lttng_index_file_put(index
->index_file
);
380 lttng_index_file_get(new_index_file
);
381 index
->index_file
= new_index_file
;
382 offset
= be64toh(index
->index_data
.offset
);
383 index
->index_data
.offset
= htobe64(offset
- removed_data_count
);
386 pthread_mutex_unlock(&index
->lock
);
391 * Switch the index file of all pending indexes for a stream and update the
392 * data offset by substracting the last safe position.
393 * Stream lock must be held.
395 int relay_index_switch_all_files(struct relay_stream
*stream
)
397 struct lttng_ht_iter iter
;
398 struct relay_index
*index
;
402 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
403 index
, index_n
.node
) {
404 DBG("Update index to fd %d", stream
->index_file
->fd
);
405 ret
= relay_index_switch_file(index
, stream
->index_file
,
406 stream
->pos_after_last_complete_data_index
);
417 * Set index data from the control port to a given index object.
419 int relay_index_set_control_data(struct relay_index
*index
,
420 const struct lttcomm_relayd_index
*data
,
421 unsigned int minor_version
)
423 /* The index on disk is encoded in big endian. */
424 const struct ctf_packet_index index_data
= {
425 .packet_size
= htobe64(data
->packet_size
),
426 .content_size
= htobe64(data
->content_size
),
427 .timestamp_begin
= htobe64(data
->timestamp_begin
),
428 .timestamp_end
= htobe64(data
->timestamp_end
),
429 .events_discarded
= htobe64(data
->events_discarded
),
430 .stream_id
= htobe64(data
->stream_id
),
433 if (minor_version
>= 8) {
434 index
->index_data
.stream_instance_id
= htobe64(data
->stream_instance_id
);
435 index
->index_data
.packet_seq_num
= htobe64(data
->packet_seq_num
);
438 return relay_index_set_data(index
, &index_data
);