b15bbcd7702954e675a98c06e02ab31058ff7499
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 #include <common/common.h>
24 #include <common/utils.h>
26 #include "lttng-relayd.h"
31 * Allocate a new relay index object. Pass the stream in which it is
32 * contained as parameter. The sequence number will be used as the hash
35 * Called with stream mutex held.
36 * Return allocated object or else NULL on error.
38 static struct relay_index
*relay_index_create(struct relay_stream
*stream
,
41 struct relay_index
*index
;
43 DBG2("Creating relay index for stream id %" PRIu64
" and seqnum %" PRIu64
,
44 stream
->stream_handle
, net_seq_num
);
46 index
= zmalloc(sizeof(*index
));
48 PERROR("Relay index zmalloc");
51 if (!stream_get(stream
)) {
52 ERR("Cannot get stream");
57 index
->stream
= stream
;
59 lttng_ht_node_init_u64(&index
->index_n
, net_seq_num
);
60 pthread_mutex_init(&index
->lock
, NULL
);
61 pthread_mutex_init(&index
->reflock
, NULL
);
62 urcu_ref_init(&index
->ref
);
69 * Add unique relay index to the given hash table. In case of a collision, the
70 * already existing object is put in the given _index variable.
72 * RCU read side lock MUST be acquired.
74 static struct relay_index
*relay_index_add_unique(struct relay_stream
*stream
,
75 struct relay_index
*index
)
77 struct cds_lfht_node
*node_ptr
;
78 struct relay_index
*_index
;
80 DBG2("Adding relay index with stream id %" PRIu64
" and seqnum %" PRIu64
,
81 stream
->stream_handle
, index
->index_n
.key
);
83 node_ptr
= cds_lfht_add_unique(stream
->indexes_ht
->ht
,
84 stream
->indexes_ht
->hash_fct(&index
->index_n
, lttng_ht_seed
),
85 stream
->indexes_ht
->match_fct
, &index
->index_n
,
86 &index
->index_n
.node
);
87 if (node_ptr
!= &index
->index_n
.node
) {
88 _index
= caa_container_of(node_ptr
, struct relay_index
,
97 * Should be called with RCU read-side lock held.
99 static bool relay_index_get(struct relay_index
*index
)
101 bool has_ref
= false;
103 DBG2("index get for stream id %" PRIu64
" and seqnum %" PRIu64
" refcount %d",
104 index
->stream
->stream_handle
, index
->index_n
.key
,
105 (int) index
->ref
.refcount
);
107 /* Confirm that the index refcount has not reached 0. */
108 pthread_mutex_lock(&index
->reflock
);
109 if (index
->ref
.refcount
!= 0) {
111 urcu_ref_get(&index
->ref
);
113 pthread_mutex_unlock(&index
->reflock
);
119 * Get a relayd index in within the given stream, or create it if not
122 * Called with stream mutex held.
123 * Return index object or else NULL on error.
125 struct relay_index
*relay_index_get_by_id_or_create(struct relay_stream
*stream
,
126 uint64_t net_seq_num
)
128 struct lttng_ht_node_u64
*node
;
129 struct lttng_ht_iter iter
;
130 struct relay_index
*index
= NULL
;
132 DBG3("Finding index for stream id %" PRIu64
" and seq_num %" PRIu64
,
133 stream
->stream_handle
, net_seq_num
);
136 lttng_ht_lookup(stream
->indexes_ht
, &net_seq_num
, &iter
);
137 node
= lttng_ht_iter_get_node_u64(&iter
);
139 index
= caa_container_of(node
, struct relay_index
, index_n
);
141 struct relay_index
*oldindex
;
143 index
= relay_index_create(stream
, net_seq_num
);
145 ERR("Cannot create index for stream id %" PRIu64
" and seq_num %" PRIu64
,
146 stream
->stream_handle
, net_seq_num
);
149 oldindex
= relay_index_add_unique(stream
, index
);
151 /* Added concurrently, keep old. */
152 relay_index_put(index
);
154 if (!relay_index_get(index
)) {
158 stream
->indexes_in_flight
++;
159 index
->in_hash_table
= true;
164 DBG2("Index %sfound or created in HT for stream ID %" PRIu64
" and seqnum %" PRIu64
,
165 (index
== NULL
) ? "NOT " : "", stream
->stream_handle
, net_seq_num
);
169 int relay_index_set_file(struct relay_index
*index
,
170 struct lttng_index_file
*index_file
,
171 uint64_t data_offset
)
175 pthread_mutex_lock(&index
->lock
);
176 if (index
->index_file
) {
180 lttng_index_file_get(index_file
);
181 index
->index_file
= index_file
;
182 index
->index_data
.offset
= data_offset
;
184 pthread_mutex_unlock(&index
->lock
);
188 int relay_index_set_data(struct relay_index
*index
,
189 const struct ctf_packet_index
*data
)
193 pthread_mutex_lock(&index
->lock
);
194 if (index
->has_index_data
) {
198 /* Set everything except data_offset. */
199 index
->index_data
.packet_size
= data
->packet_size
;
200 index
->index_data
.content_size
= data
->content_size
;
201 index
->index_data
.timestamp_begin
= data
->timestamp_begin
;
202 index
->index_data
.timestamp_end
= data
->timestamp_end
;
203 index
->index_data
.events_discarded
= data
->events_discarded
;
204 index
->index_data
.stream_id
= data
->stream_id
;
205 index
->has_index_data
= true;
207 pthread_mutex_unlock(&index
->lock
);
211 static void index_destroy(struct relay_index
*index
)
216 static void index_destroy_rcu(struct rcu_head
*rcu_head
)
218 struct relay_index
*index
=
219 caa_container_of(rcu_head
, struct relay_index
, rcu_node
);
221 index_destroy(index
);
224 /* Stream lock must be held by the caller. */
225 static void index_release(struct urcu_ref
*ref
)
227 struct relay_index
*index
= caa_container_of(ref
, struct relay_index
, ref
);
228 struct relay_stream
*stream
= index
->stream
;
230 struct lttng_ht_iter iter
;
232 if (index
->index_file
) {
233 lttng_index_file_put(index
->index_file
);
234 index
->index_file
= NULL
;
236 if (index
->in_hash_table
) {
237 /* Delete index from hash table. */
238 iter
.iter
.node
= &index
->index_n
.node
;
239 ret
= lttng_ht_del(stream
->indexes_ht
, &iter
);
241 stream
->indexes_in_flight
--;
244 stream_put(index
->stream
);
245 index
->stream
= NULL
;
247 call_rcu(&index
->rcu_node
, index_destroy_rcu
);
251 * Called with stream mutex held.
253 * Stream lock must be held by the caller.
255 void relay_index_put(struct relay_index
*index
)
257 DBG2("index put for stream id %" PRIu64
" and seqnum %" PRIu64
" refcount %d",
258 index
->stream
->stream_handle
, index
->index_n
.key
,
259 (int) index
->ref
.refcount
);
261 * Ensure existance of index->lock for index unlock.
265 * Index lock ensures that concurrent test and update of stream
268 pthread_mutex_lock(&index
->reflock
);
269 assert(index
->ref
.refcount
!= 0);
270 urcu_ref_put(&index
->ref
, index_release
);
271 pthread_mutex_unlock(&index
->reflock
);
276 * Try to flush index to disk. Releases self-reference to index once
279 * Stream lock must be held by the caller.
280 * Return 0 on successful flush, a negative value on error, or positive
281 * value if no flush was performed.
283 int relay_index_try_flush(struct relay_index
*index
)
286 bool flushed
= false;
289 pthread_mutex_lock(&index
->lock
);
290 if (index
->flushed
) {
293 /* Check if we are ready to flush. */
294 if (!index
->has_index_data
|| !index
->index_file
) {
297 fd
= index
->index_file
->fd
;
298 DBG2("Writing index for stream ID %" PRIu64
" and seq num %" PRIu64
299 " on fd %d", index
->stream
->stream_handle
,
300 index
->index_n
.key
, fd
);
302 index
->flushed
= true;
303 ret
= lttng_index_file_write(index
->index_file
, &index
->index_data
);
305 pthread_mutex_unlock(&index
->lock
);
308 /* Put self-ref from index now that it has been flushed. */
309 relay_index_put(index
);
315 * Close every relay index within a given stream, without flushing
318 void relay_index_close_all(struct relay_stream
*stream
)
320 struct lttng_ht_iter iter
;
321 struct relay_index
*index
;
324 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
325 index
, index_n
.node
) {
326 /* Put self-ref from index. */
327 relay_index_put(index
);
332 void relay_index_close_partial_fd(struct relay_stream
*stream
)
334 struct lttng_ht_iter iter
;
335 struct relay_index
*index
;
338 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
339 index
, index_n
.node
) {
340 if (!index
->index_file
) {
344 * Partial index has its index_file: we have only
345 * received its info from the data socket.
346 * Put self-ref from index.
348 relay_index_put(index
);
353 uint64_t relay_index_find_last(struct relay_stream
*stream
)
355 struct lttng_ht_iter iter
;
356 struct relay_index
*index
;
357 uint64_t net_seq_num
= -1ULL;
360 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
,
361 index
, index_n
.node
) {
362 if (net_seq_num
== -1ULL ||
363 index
->index_n
.key
> net_seq_num
) {
364 net_seq_num
= index
->index_n
.key
;
This page took 0.036805 seconds and 4 git commands to generate.