2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License, version 2 only, as
9 * published by the Free Software Foundation.
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 * You should have received a copy of the GNU General Public License along with
17 * this program; if not, write to the Free Software Foundation, Inc., 51
18 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include <common/common.h>
23 #include <common/defaults.h>
24 #include <common/fs-handle.h>
25 #include <common/sessiond-comm/relayd.h>
26 #include <common/utils.h>
28 #include <urcu/rculist.h>
30 #include "lttng-relayd.h"
33 #include "viewer-stream.h"
35 #include <sys/types.h>
38 #define FILE_IO_STACK_BUFFER_SIZE 65536
40 /* Should be called with RCU read-side lock held. */
41 bool stream_get(struct relay_stream
*stream
)
43 return urcu_ref_get_unless_zero(&stream
->ref
);
47 * Get stream from stream id from the streams hash table. Return stream
48 * if found else NULL. A stream reference is taken when a stream is
49 * returned. stream_put() must be called on that stream.
51 struct relay_stream
*stream_get_by_id(uint64_t stream_id
)
53 struct lttng_ht_node_u64
*node
;
54 struct lttng_ht_iter iter
;
55 struct relay_stream
*stream
= NULL
;
58 lttng_ht_lookup(relay_streams_ht
, &stream_id
, &iter
);
59 node
= lttng_ht_iter_get_node_u64(&iter
);
61 DBG("Relay stream %" PRIu64
" not found", stream_id
);
64 stream
= caa_container_of(node
, struct relay_stream
, node
);
65 if (!stream_get(stream
)) {
73 static void stream_complete_rotation(struct relay_stream
*stream
)
75 DBG("Rotation completed for stream %" PRIu64
, stream
->stream_handle
);
76 if (stream
->ongoing_rotation
.value
.next_trace_chunk
) {
77 tracefile_array_reset(stream
->tfa
);
78 tracefile_array_commit_seq(stream
->tfa
,
79 stream
->index_received_seqcount
);
81 lttng_trace_chunk_put(stream
->trace_chunk
);
82 stream
->trace_chunk
= stream
->ongoing_rotation
.value
.next_trace_chunk
;
83 stream
->ongoing_rotation
= (typeof(stream
->ongoing_rotation
)) {};
86 static int stream_create_data_output_file_from_trace_chunk(
87 struct relay_stream
*stream
,
88 struct lttng_trace_chunk
*trace_chunk
,
90 struct fs_handle
**out_file
)
93 char stream_path
[LTTNG_PATH_MAX
];
94 enum lttng_trace_chunk_status status
;
95 const int flags
= O_RDWR
| O_CREAT
| O_TRUNC
;
96 const mode_t mode
= S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
;
98 ASSERT_LOCKED(stream
->lock
);
100 ret
= utils_stream_file_path(stream
->path_name
, stream
->channel_name
,
101 stream
->tracefile_size
, stream
->tracefile_current_index
,
102 NULL
, stream_path
, sizeof(stream_path
));
107 if (stream
->tracefile_wrapped_around
|| force_unlink
) {
109 * The on-disk ring-buffer has wrapped around.
110 * Newly created stream files will replace existing files. Since
111 * live clients may be consuming existing files, the file about
112 * to be replaced is unlinked in order to not overwrite its
115 status
= lttng_trace_chunk_unlink_file(trace_chunk
,
117 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
118 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
121 * Don't abort if the file doesn't exist, it is
122 * unexpected, but should not be a fatal error.
124 if (errno
!= ENOENT
) {
131 status
= lttng_trace_chunk_open_fs_handle(trace_chunk
, stream_path
,
132 flags
, mode
, out_file
, false);
133 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
134 ERR("Failed to open stream file \"%s\"", stream
->channel_name
);
142 static int stream_rotate_data_file(struct relay_stream
*stream
)
146 DBG("Rotating stream %" PRIu64
" data file with size %" PRIu64
,
147 stream
->stream_handle
, stream
->tracefile_size_current
);
150 fs_handle_close(stream
->file
);
154 stream
->tracefile_wrapped_around
= false;
155 stream
->tracefile_current_index
= 0;
157 if (stream
->ongoing_rotation
.value
.next_trace_chunk
) {
158 enum lttng_trace_chunk_status chunk_status
;
160 chunk_status
= lttng_trace_chunk_create_subdirectory(
161 stream
->ongoing_rotation
.value
.next_trace_chunk
,
163 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
168 /* Rotate the data file. */
169 ret
= stream_create_data_output_file_from_trace_chunk(stream
,
170 stream
->ongoing_rotation
.value
.next_trace_chunk
,
171 false, &stream
->file
);
173 ERR("Failed to rotate stream data file");
177 DBG("%s: reset tracefile_size_current for stream %" PRIu64
" was %" PRIu64
,
178 __func__
, stream
->stream_handle
, stream
->tracefile_size_current
);
179 stream
->tracefile_size_current
= 0;
180 stream
->pos_after_last_complete_data_index
= 0;
181 stream
->ongoing_rotation
.value
.data_rotated
= true;
183 if (stream
->ongoing_rotation
.value
.index_rotated
) {
184 /* Rotation completed; reset its state. */
185 stream_complete_rotation(stream
);
192 * If too much data has been written in a tracefile before we received the
193 * rotation command, we have to move the excess data to the new tracefile and
194 * perform the rotation. This can happen because the control and data
195 * connections are separate, the indexes as well as the commands arrive from
196 * the control connection and we have no control over the order so we could be
197 * in a situation where too much data has been received on the data connection
198 * before the rotation command on the control connection arrives.
200 static int rotate_truncate_stream(struct relay_stream
*stream
)
203 off_t lseek_ret
, previous_stream_copy_origin
;
204 uint64_t copy_bytes_left
, misplaced_data_size
;
205 bool acquired_reference
;
206 struct fs_handle
*previous_stream_file
= NULL
;
207 struct lttng_trace_chunk
*previous_chunk
= NULL
;
209 if (!LTTNG_OPTIONAL_GET(stream
->ongoing_rotation
).next_trace_chunk
) {
210 ERR("Protocol error encoutered in %s(): stream rotation "
211 "sequence number is before the current sequence number "
212 "and the next trace chunk is unset. Honoring this "
213 "rotation command would result in data loss",
219 ASSERT_LOCKED(stream
->lock
);
221 * Acquire a reference to the current trace chunk to ensure
222 * it is not reclaimed when `stream_rotate_data_file` is called.
223 * Failing to do so would violate the contract of the trace
224 * chunk API as an active file descriptor would outlive the
227 acquired_reference
= lttng_trace_chunk_get(stream
->trace_chunk
);
228 assert(acquired_reference
);
229 previous_chunk
= stream
->trace_chunk
;
232 * Steal the stream's reference to its stream_fd. A new
233 * stream_fd will be created when the rotation completes and
234 * the orinal stream_fd will be used to copy the "extra" data
237 assert(stream
->file
);
238 previous_stream_file
= stream
->file
;
241 assert(!stream
->is_metadata
);
242 assert(stream
->tracefile_size_current
>
243 stream
->pos_after_last_complete_data_index
);
244 misplaced_data_size
= stream
->tracefile_size_current
-
245 stream
->pos_after_last_complete_data_index
;
246 copy_bytes_left
= misplaced_data_size
;
247 previous_stream_copy_origin
= stream
->pos_after_last_complete_data_index
;
249 ret
= stream_rotate_data_file(stream
);
254 assert(stream
->file
);
256 * Seek the current tracefile to the position at which the rotation
257 * should have occurred.
259 lseek_ret
= fs_handle_seek(previous_stream_file
, previous_stream_copy_origin
, SEEK_SET
);
261 PERROR("Failed to seek to offset %" PRIu64
262 " while copying extra data received before a stream rotation",
263 (uint64_t) previous_stream_copy_origin
);
268 /* Move data from the old file to the new file. */
269 while (copy_bytes_left
) {
271 char copy_buffer
[FILE_IO_STACK_BUFFER_SIZE
];
272 const off_t copy_size_this_pass
= min_t(
273 off_t
, copy_bytes_left
, sizeof(copy_buffer
));
275 io_ret
= fs_handle_read(previous_stream_file
, copy_buffer
,
276 copy_size_this_pass
);
277 if (io_ret
< (ssize_t
) copy_size_this_pass
) {
279 PERROR("Failed to read %" PRIu64
280 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
282 __FUNCTION__
, io_ret
,
283 stream
->stream_handle
);
285 ERR("Failed to read %" PRIu64
286 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
288 __FUNCTION__
, io_ret
,
289 stream
->stream_handle
);
295 io_ret
= fs_handle_write(
296 stream
->file
, copy_buffer
, copy_size_this_pass
);
297 if (io_ret
< (ssize_t
) copy_size_this_pass
) {
299 PERROR("Failed to write %" PRIu64
300 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
302 __FUNCTION__
, io_ret
,
303 stream
->stream_handle
);
305 ERR("Failed to write %" PRIu64
306 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64
,
308 __FUNCTION__
, io_ret
,
309 stream
->stream_handle
);
314 copy_bytes_left
-= copy_size_this_pass
;
317 /* Truncate the file to get rid of the excess data. */
318 ret
= fs_handle_truncate(
319 previous_stream_file
, previous_stream_copy_origin
);
321 PERROR("Failed to truncate current stream file to offset %" PRIu64
,
322 previous_stream_copy_origin
);
327 * Update the offset and FD of all the eventual indexes created by the
328 * data connection before the rotation command arrived.
330 ret
= relay_index_switch_all_files(stream
);
332 ERR("Failed to rotate index file");
336 stream
->tracefile_size_current
= misplaced_data_size
;
337 /* Index and data contents are back in sync. */
338 stream
->pos_after_last_complete_data_index
= 0;
341 lttng_trace_chunk_put(previous_chunk
);
346 * Check if a stream's data file (as opposed to index) should be rotated
347 * (for session rotation).
348 * Must be called with the stream lock held.
350 * Return 0 on success, a negative value on error.
352 static int try_rotate_stream_data(struct relay_stream
*stream
)
356 if (caa_likely(!stream
->ongoing_rotation
.is_set
)) {
357 /* No rotation expected. */
361 if (stream
->ongoing_rotation
.value
.data_rotated
) {
362 /* Rotation of the data file has already occurred. */
366 DBG("%s: Stream %" PRIu64
367 " (rotate_at_index_packet_seq_num = %" PRIu64
368 ", rotate_at_prev_data_net_seq = %" PRIu64
369 ", prev_data_seq = %" PRIu64
")",
370 __func__
, stream
->stream_handle
,
371 stream
->ongoing_rotation
.value
.packet_seq_num
,
372 stream
->ongoing_rotation
.value
.prev_data_net_seq
,
373 stream
->prev_data_seq
);
375 if (stream
->prev_data_seq
== -1ULL ||
376 stream
->ongoing_rotation
.value
.prev_data_net_seq
== -1ULL ||
377 stream
->prev_data_seq
<
378 stream
->ongoing_rotation
.value
.prev_data_net_seq
) {
380 * The next packet that will be written is not part of the next
383 DBG("Stream %" PRIu64
" data not yet ready for rotation "
384 "(rotate_at_index_packet_seq_num = %" PRIu64
385 ", rotate_at_prev_data_net_seq = %" PRIu64
386 ", prev_data_seq = %" PRIu64
")",
387 stream
->stream_handle
,
388 stream
->ongoing_rotation
.value
.packet_seq_num
,
389 stream
->ongoing_rotation
.value
.prev_data_net_seq
,
390 stream
->prev_data_seq
);
392 } else if (stream
->prev_data_seq
> stream
->ongoing_rotation
.value
.prev_data_net_seq
) {
394 * prev_data_seq is checked here since indexes and rotation
395 * commands are serialized with respect to each other.
397 DBG("Rotation after too much data has been written in tracefile "
398 "for stream %" PRIu64
", need to truncate before "
399 "rotating", stream
->stream_handle
);
400 ret
= rotate_truncate_stream(stream
);
402 ERR("Failed to truncate stream");
406 ret
= stream_rotate_data_file(stream
);
414 * Close the current index file if it is open, and create a new one.
416 * Return 0 on success, -1 on error.
418 static int create_index_file(struct relay_stream
*stream
,
419 struct lttng_trace_chunk
*chunk
)
422 uint32_t major
, minor
;
423 char *index_subpath
= NULL
;
424 enum lttng_trace_chunk_status status
;
426 ASSERT_LOCKED(stream
->lock
);
428 /* Put ref on previous index_file. */
429 if (stream
->index_file
) {
430 lttng_index_file_put(stream
->index_file
);
431 stream
->index_file
= NULL
;
433 major
= stream
->trace
->session
->major
;
434 minor
= stream
->trace
->session
->minor
;
440 ret
= asprintf(&index_subpath
, "%s/%s", stream
->path_name
,
446 status
= lttng_trace_chunk_create_subdirectory(chunk
,
449 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
453 status
= lttng_index_file_create_from_trace_chunk(
454 chunk
, stream
->path_name
,
455 stream
->channel_name
, stream
->tracefile_size
,
456 stream
->tracefile_current_index
,
457 lttng_to_index_major(major
, minor
),
458 lttng_to_index_minor(major
, minor
), true,
459 &stream
->index_file
);
460 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
472 * Check if a stream's index file should be rotated (for session rotation).
473 * Must be called with the stream lock held.
475 * Return 0 on success, a negative value on error.
477 static int try_rotate_stream_index(struct relay_stream
*stream
)
481 if (!stream
->ongoing_rotation
.is_set
) {
482 /* No rotation expected. */
486 if (stream
->ongoing_rotation
.value
.index_rotated
) {
487 /* Rotation of the index has already occurred. */
491 DBG("%s: Stream %" PRIu64
492 " (rotate_at_packet_seq_num = %" PRIu64
493 ", received_packet_seq_num = "
494 "(value = %" PRIu64
", is_set = %" PRIu8
"))",
495 __func__
, stream
->stream_handle
,
496 stream
->ongoing_rotation
.value
.packet_seq_num
,
497 stream
->received_packet_seq_num
.value
,
498 stream
->received_packet_seq_num
.is_set
);
500 if (!stream
->received_packet_seq_num
.is_set
||
501 LTTNG_OPTIONAL_GET(stream
->received_packet_seq_num
) + 1 <
502 stream
->ongoing_rotation
.value
.packet_seq_num
) {
503 DBG("Stream %" PRIu64
" index not yet ready for rotation "
504 "(rotate_at_packet_seq_num = %" PRIu64
505 ", received_packet_seq_num = "
506 "(value = %" PRIu64
", is_set = %" PRIu8
"))",
507 stream
->stream_handle
,
508 stream
->ongoing_rotation
.value
.packet_seq_num
,
509 stream
->received_packet_seq_num
.value
,
510 stream
->received_packet_seq_num
.is_set
);
514 * The next index belongs to the new trace chunk; rotate.
515 * In overwrite mode, the packet seq num may jump over the
518 assert(LTTNG_OPTIONAL_GET(stream
->received_packet_seq_num
) + 1 >=
519 stream
->ongoing_rotation
.value
.packet_seq_num
);
520 DBG("Rotating stream %" PRIu64
" index file",
521 stream
->stream_handle
);
522 if (stream
->index_file
) {
523 lttng_index_file_put(stream
->index_file
);
524 stream
->index_file
= NULL
;
526 stream
->ongoing_rotation
.value
.index_rotated
= true;
529 * Set the rotation pivot position for the data, now that we have the
530 * net_seq_num matching the packet_seq_num index pivot position.
532 stream
->ongoing_rotation
.value
.prev_data_net_seq
=
533 stream
->prev_index_seq
;
534 if (stream
->ongoing_rotation
.value
.data_rotated
&&
535 stream
->ongoing_rotation
.value
.index_rotated
) {
536 /* Rotation completed; reset its state. */
537 DBG("Rotation completed for stream %" PRIu64
,
538 stream
->stream_handle
);
539 stream_complete_rotation(stream
);
547 static int stream_set_trace_chunk(struct relay_stream
*stream
,
548 struct lttng_trace_chunk
*chunk
)
551 enum lttng_trace_chunk_status status
;
552 bool acquired_reference
;
554 status
= lttng_trace_chunk_create_subdirectory(chunk
,
556 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
561 lttng_trace_chunk_put(stream
->trace_chunk
);
562 acquired_reference
= lttng_trace_chunk_get(chunk
);
563 assert(acquired_reference
);
564 stream
->trace_chunk
= chunk
;
567 fs_handle_close(stream
->file
);
570 ret
= stream_create_data_output_file_from_trace_chunk(stream
, chunk
,
571 false, &stream
->file
);
577 * We keep ownership of path_name and channel_name.
579 struct relay_stream
*stream_create(struct ctf_trace
*trace
,
580 uint64_t stream_handle
, char *path_name
,
581 char *channel_name
, uint64_t tracefile_size
,
582 uint64_t tracefile_count
)
585 struct relay_stream
*stream
= NULL
;
586 struct relay_session
*session
= trace
->session
;
587 bool acquired_reference
= false;
588 struct lttng_trace_chunk
*current_trace_chunk
;
590 stream
= zmalloc(sizeof(struct relay_stream
));
591 if (stream
== NULL
) {
592 PERROR("relay stream zmalloc");
596 stream
->stream_handle
= stream_handle
;
597 stream
->prev_data_seq
= -1ULL;
598 stream
->prev_index_seq
= -1ULL;
599 stream
->last_net_seq_num
= -1ULL;
600 stream
->ctf_stream_id
= -1ULL;
601 stream
->tracefile_size
= tracefile_size
;
602 stream
->tracefile_count
= tracefile_count
;
603 stream
->path_name
= path_name
;
604 stream
->channel_name
= channel_name
;
605 stream
->beacon_ts_end
= -1ULL;
606 lttng_ht_node_init_u64(&stream
->node
, stream
->stream_handle
);
607 pthread_mutex_init(&stream
->lock
, NULL
);
608 urcu_ref_init(&stream
->ref
);
609 ctf_trace_get(trace
);
610 stream
->trace
= trace
;
612 pthread_mutex_lock(&trace
->session
->lock
);
613 current_trace_chunk
= trace
->session
->current_trace_chunk
;
614 if (current_trace_chunk
) {
615 acquired_reference
= lttng_trace_chunk_get(current_trace_chunk
);
617 pthread_mutex_unlock(&trace
->session
->lock
);
618 if (!acquired_reference
) {
619 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
625 stream
->indexes_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
626 if (!stream
->indexes_ht
) {
627 ERR("Cannot created indexes_ht");
632 pthread_mutex_lock(&stream
->lock
);
633 ret
= stream_set_trace_chunk(stream
, current_trace_chunk
);
634 pthread_mutex_unlock(&stream
->lock
);
636 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
637 trace
->session
->session_name
,
638 stream
->channel_name
);
642 stream
->tfa
= tracefile_array_create(stream
->tracefile_count
);
648 stream
->is_metadata
= !strcmp(stream
->channel_name
,
649 DEFAULT_METADATA_NAME
);
650 stream
->in_recv_list
= true;
653 * Add the stream in the recv list of the session. Once the end stream
654 * message is received, all session streams are published.
656 pthread_mutex_lock(&session
->recv_list_lock
);
657 cds_list_add_rcu(&stream
->recv_node
, &session
->recv_list
);
658 session
->stream_count
++;
659 pthread_mutex_unlock(&session
->recv_list_lock
);
662 * Both in the ctf_trace object and the global stream ht since the data
663 * side of the relayd does not have the concept of session.
665 lttng_ht_add_unique_u64(relay_streams_ht
, &stream
->node
);
666 stream
->in_stream_ht
= true;
668 DBG("Relay new stream added %s with ID %" PRIu64
, stream
->channel_name
,
669 stream
->stream_handle
);
675 fs_handle_close(stream
->file
);
681 if (acquired_reference
) {
682 lttng_trace_chunk_put(current_trace_chunk
);
688 * path_name and channel_name need to be freed explicitly here
689 * because we cannot rely on stream_put().
697 * Called with the session lock held.
699 void stream_publish(struct relay_stream
*stream
)
701 struct relay_session
*session
;
703 pthread_mutex_lock(&stream
->lock
);
704 if (stream
->published
) {
708 session
= stream
->trace
->session
;
710 pthread_mutex_lock(&session
->recv_list_lock
);
711 if (stream
->in_recv_list
) {
712 cds_list_del_rcu(&stream
->recv_node
);
713 stream
->in_recv_list
= false;
715 pthread_mutex_unlock(&session
->recv_list_lock
);
717 pthread_mutex_lock(&stream
->trace
->stream_list_lock
);
718 cds_list_add_rcu(&stream
->stream_node
, &stream
->trace
->stream_list
);
719 pthread_mutex_unlock(&stream
->trace
->stream_list_lock
);
721 stream
->published
= true;
723 pthread_mutex_unlock(&stream
->lock
);
727 * Stream must be protected by holding the stream lock or by virtue of being
728 * called from stream_destroy.
730 static void stream_unpublish(struct relay_stream
*stream
)
732 if (stream
->in_stream_ht
) {
733 struct lttng_ht_iter iter
;
736 iter
.iter
.node
= &stream
->node
.node
;
737 ret
= lttng_ht_del(relay_streams_ht
, &iter
);
739 stream
->in_stream_ht
= false;
741 if (stream
->published
) {
742 pthread_mutex_lock(&stream
->trace
->stream_list_lock
);
743 cds_list_del_rcu(&stream
->stream_node
);
744 pthread_mutex_unlock(&stream
->trace
->stream_list_lock
);
745 stream
->published
= false;
749 static void stream_destroy(struct relay_stream
*stream
)
751 if (stream
->indexes_ht
) {
753 * Calling lttng_ht_destroy in call_rcu worker thread so
754 * we don't hold the RCU read-side lock while calling
757 lttng_ht_destroy(stream
->indexes_ht
);
760 tracefile_array_destroy(stream
->tfa
);
762 free(stream
->path_name
);
763 free(stream
->channel_name
);
767 static void stream_destroy_rcu(struct rcu_head
*rcu_head
)
769 struct relay_stream
*stream
=
770 caa_container_of(rcu_head
, struct relay_stream
, rcu_node
);
772 stream_destroy(stream
);
776 * No need to take stream->lock since this is only called on the final
777 * stream_put which ensures that a single thread may act on the stream.
779 static void stream_release(struct urcu_ref
*ref
)
781 struct relay_stream
*stream
=
782 caa_container_of(ref
, struct relay_stream
, ref
);
783 struct relay_session
*session
;
785 session
= stream
->trace
->session
;
787 DBG("Releasing stream id %" PRIu64
, stream
->stream_handle
);
789 pthread_mutex_lock(&session
->recv_list_lock
);
790 session
->stream_count
--;
791 if (stream
->in_recv_list
) {
792 cds_list_del_rcu(&stream
->recv_node
);
793 stream
->in_recv_list
= false;
795 pthread_mutex_unlock(&session
->recv_list_lock
);
797 stream_unpublish(stream
);
800 fs_handle_close(stream
->file
);
803 if (stream
->index_file
) {
804 lttng_index_file_put(stream
->index_file
);
805 stream
->index_file
= NULL
;
808 ctf_trace_put(stream
->trace
);
809 stream
->trace
= NULL
;
811 stream_complete_rotation(stream
);
812 lttng_trace_chunk_put(stream
->trace_chunk
);
813 stream
->trace_chunk
= NULL
;
815 call_rcu(&stream
->rcu_node
, stream_destroy_rcu
);
818 void stream_put(struct relay_stream
*stream
)
821 assert(stream
->ref
.refcount
!= 0);
823 * Wait until we have processed all the stream packets before
824 * actually putting our last stream reference.
826 urcu_ref_put(&stream
->ref
, stream_release
);
830 int stream_set_pending_rotation(struct relay_stream
*stream
,
831 struct lttng_trace_chunk
*next_trace_chunk
,
832 uint64_t rotation_sequence_number
)
835 const struct relay_stream_rotation rotation
= {
836 .data_rotated
= false,
837 .index_rotated
= false,
838 .packet_seq_num
= rotation_sequence_number
,
839 .prev_data_net_seq
= -1ULL,
840 .next_trace_chunk
= next_trace_chunk
,
843 if (stream
->ongoing_rotation
.is_set
) {
844 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
849 if (next_trace_chunk
) {
850 const bool reference_acquired
=
851 lttng_trace_chunk_get(next_trace_chunk
);
853 assert(reference_acquired
);
855 LTTNG_OPTIONAL_SET(&stream
->ongoing_rotation
, rotation
);
857 DBG("Setting pending rotation: stream_id = %" PRIu64
858 ", rotate_at_packet_seq_num = %" PRIu64
,
859 stream
->stream_handle
, rotation_sequence_number
);
860 if (stream
->is_metadata
) {
862 * A metadata stream has no index; consider it already rotated.
864 stream
->ongoing_rotation
.value
.index_rotated
= true;
865 if (next_trace_chunk
) {
867 * The metadata will be received again in the new chunk.
869 stream
->metadata_received
= 0;
871 ret
= stream_rotate_data_file(stream
);
873 ret
= try_rotate_stream_index(stream
);
878 ret
= try_rotate_stream_data(stream
);
887 void try_stream_close(struct relay_stream
*stream
)
889 bool session_aborted
;
890 struct relay_session
*session
= stream
->trace
->session
;
892 DBG("Trying to close stream %" PRIu64
, stream
->stream_handle
);
894 pthread_mutex_lock(&session
->lock
);
895 session_aborted
= session
->aborted
;
896 pthread_mutex_unlock(&session
->lock
);
898 pthread_mutex_lock(&stream
->lock
);
900 * Can be called concurently by connection close and reception of last
903 if (stream
->closed
) {
904 pthread_mutex_unlock(&stream
->lock
);
905 DBG("closing stream %" PRIu64
" aborted since it is already marked as closed", stream
->stream_handle
);
909 stream
->close_requested
= true;
911 if (stream
->last_net_seq_num
== -1ULL) {
913 * Handle connection close without explicit stream close
916 * We can be clever about indexes partially received in
917 * cases where we received the data socket part, but not
918 * the control socket part: since we're currently closing
919 * the stream on behalf of the control socket, we *know*
920 * there won't be any more control information for this
921 * socket. Therefore, we can destroy all indexes for
922 * which we have received only the file descriptor (from
923 * data socket). This takes care of consumerd crashes
924 * between sending the data and control information for
925 * a packet. Since those are sent in that order, we take
926 * care of consumerd crashes.
928 DBG("relay_index_close_partial_fd");
929 relay_index_close_partial_fd(stream
);
931 * Use the highest net_seq_num we currently have pending
932 * As end of stream indicator. Leave last_net_seq_num
933 * at -1ULL if we cannot find any index.
935 stream
->last_net_seq_num
= relay_index_find_last(stream
);
936 DBG("Updating stream->last_net_seq_num to %" PRIu64
, stream
->last_net_seq_num
);
937 /* Fall-through into the next check. */
940 if (stream
->last_net_seq_num
!= -1ULL &&
941 ((int64_t) (stream
->prev_data_seq
- stream
->last_net_seq_num
)) < 0
942 && !session_aborted
) {
944 * Don't close since we still have data pending. This
945 * handles cases where an explicit close command has
946 * been received for this stream, and cases where the
947 * connection has been closed, and we are awaiting for
948 * index information from the data socket. It is
949 * therefore expected that all the index fd information
950 * we need has already been received on the control
951 * socket. Matching index information from data socket
952 * should be Expected Soon(TM).
954 * TODO: We should implement a timer to garbage collect
955 * streams after a timeout to be resilient against a
956 * consumerd implementation that would not match this
959 pthread_mutex_unlock(&stream
->lock
);
960 DBG("closing stream %" PRIu64
" aborted since it still has data pending", stream
->stream_handle
);
964 * We received all the indexes we can expect.
966 stream_unpublish(stream
);
967 stream
->closed
= true;
968 /* Relay indexes are only used by the "consumer/sessiond" end. */
969 relay_index_close_all(stream
);
972 * If we are closed by an application exiting (per-pid buffers),
973 * we need to put our reference on the stream trace chunk right
974 * away, because otherwise still holding the reference on the
975 * trace chunk could allow a viewer stream (which holds a reference
976 * to the stream) to postpone destroy waiting for the chunk to cease
977 * to exist endlessly until the viewer is detached.
980 /* Put stream fd before put chunk. */
982 fs_handle_close(stream
->file
);
985 if (stream
->index_file
) {
986 lttng_index_file_put(stream
->index_file
);
987 stream
->index_file
= NULL
;
989 lttng_trace_chunk_put(stream
->trace_chunk
);
990 stream
->trace_chunk
= NULL
;
991 pthread_mutex_unlock(&stream
->lock
);
992 DBG("Succeeded in closing stream %" PRIu64
, stream
->stream_handle
);
996 int stream_init_packet(struct relay_stream
*stream
, size_t packet_size
,
1001 ASSERT_LOCKED(stream
->lock
);
1003 if (!stream
->file
|| !stream
->trace_chunk
) {
1004 ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64
", channel_name = %s",
1005 stream
->stream_handle
, stream
->channel_name
);
1010 if (caa_likely(stream
->tracefile_size
== 0)) {
1011 /* No size limit set; nothing to check. */
1016 * Check if writing the new packet would exceed the maximal file size.
1018 if (caa_unlikely((stream
->tracefile_size_current
+ packet_size
) >
1019 stream
->tracefile_size
)) {
1020 const uint64_t new_file_index
=
1021 (stream
->tracefile_current_index
+ 1) %
1022 stream
->tracefile_count
;
1024 if (new_file_index
< stream
->tracefile_current_index
) {
1025 stream
->tracefile_wrapped_around
= true;
1027 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
1028 ", current_file_size = %" PRIu64
1029 ", packet_size = %zu, current_file_index = %" PRIu64
1030 " new_file_index = %" PRIu64
,
1031 stream
->stream_handle
,
1032 stream
->tracefile_size_current
, packet_size
,
1033 stream
->tracefile_current_index
, new_file_index
);
1034 tracefile_array_file_rotate(stream
->tfa
, TRACEFILE_ROTATE_WRITE
);
1035 stream
->tracefile_current_index
= new_file_index
;
1038 fs_handle_close(stream
->file
);
1039 stream
->file
= NULL
;
1041 ret
= stream_create_data_output_file_from_trace_chunk(stream
,
1042 stream
->trace_chunk
, false, &stream
->file
);
1044 ERR("Failed to perform trace file rotation of stream %" PRIu64
,
1045 stream
->stream_handle
);
1050 * Reset current size because we just performed a stream
1053 DBG("%s: reset tracefile_size_current for stream %" PRIu64
" was %" PRIu64
,
1054 __func__
, stream
->stream_handle
, stream
->tracefile_size_current
);
1055 stream
->tracefile_size_current
= 0;
1056 *file_rotated
= true;
1058 *file_rotated
= false;
1064 /* Note that the packet is not necessarily complete. */
1065 int stream_write(struct relay_stream
*stream
,
1066 const struct lttng_buffer_view
*packet
, size_t padding_len
)
1070 size_t padding_to_write
= padding_len
;
1071 char padding_buffer
[FILE_IO_STACK_BUFFER_SIZE
];
1073 ASSERT_LOCKED(stream
->lock
);
1074 memset(padding_buffer
, 0,
1075 min(sizeof(padding_buffer
), padding_to_write
));
1077 if (!stream
->file
|| !stream
->trace_chunk
) {
1078 ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64
", channel_name = %s",
1079 stream
->stream_handle
, stream
->channel_name
);
1084 write_ret
= fs_handle_write(
1085 stream
->file
, packet
->data
, packet
->size
);
1086 if (write_ret
!= packet
->size
) {
1087 PERROR("Failed to write to stream file of %sstream %" PRIu64
,
1088 stream
->is_metadata
? "metadata " : "",
1089 stream
->stream_handle
);
1095 while (padding_to_write
> 0) {
1096 const size_t padding_to_write_this_pass
=
1097 min(padding_to_write
, sizeof(padding_buffer
));
1099 write_ret
= fs_handle_write(stream
->file
, padding_buffer
,
1100 padding_to_write_this_pass
);
1101 if (write_ret
!= padding_to_write_this_pass
) {
1102 PERROR("Failed to write padding to file of %sstream %" PRIu64
,
1103 stream
->is_metadata
? "metadata " : "",
1104 stream
->stream_handle
);
1108 padding_to_write
-= padding_to_write_this_pass
;
1111 if (stream
->is_metadata
) {
1114 recv_len
= packet
? packet
->size
: 0;
1115 recv_len
+= padding_len
;
1116 stream
->metadata_received
+= recv_len
;
1118 stream
->no_new_metadata_notified
= false;
1122 DBG("Wrote to %sstream %" PRIu64
": data_length = %zu, padding_length = %zu",
1123 stream
->is_metadata
? "metadata " : "",
1124 stream
->stream_handle
,
1125 packet
? packet
->size
: (size_t) 0, padding_len
);
1131 * Update index after receiving a packet for a data stream.
1133 * Called with the stream lock held.
1135 * Return 0 on success else a negative value.
1137 int stream_update_index(struct relay_stream
*stream
, uint64_t net_seq_num
,
1138 bool rotate_index
, bool *flushed
, uint64_t total_size
)
1141 uint64_t data_offset
;
1142 struct relay_index
*index
;
1144 assert(stream
->trace_chunk
);
1145 ASSERT_LOCKED(stream
->lock
);
1146 /* Get data offset because we are about to update the index. */
1147 data_offset
= htobe64(stream
->tracefile_size_current
);
1149 DBG("handle_index_data: stream %" PRIu64
" net_seq_num %" PRIu64
" data offset %" PRIu64
,
1150 stream
->stream_handle
, net_seq_num
, stream
->tracefile_size_current
);
1153 * Lookup for an existing index for that stream id/sequence
1154 * number. If it exists, the control thread has already received the
1155 * data for it, thus we need to write it to disk.
1157 index
= relay_index_get_by_id_or_create(stream
, net_seq_num
);
1163 if (rotate_index
|| !stream
->index_file
) {
1164 ret
= create_index_file(stream
, stream
->trace_chunk
);
1166 ERR("Failed to create index file for stream %" PRIu64
,
1167 stream
->stream_handle
);
1168 /* Put self-ref for this index due to error. */
1169 relay_index_put(index
);
1175 if (relay_index_set_file(index
, stream
->index_file
, data_offset
)) {
1177 /* Put self-ref for this index due to error. */
1178 relay_index_put(index
);
1183 ret
= relay_index_try_flush(index
);
1185 tracefile_array_file_rotate(stream
->tfa
, TRACEFILE_ROTATE_READ
);
1186 tracefile_array_commit_seq(stream
->tfa
, stream
->index_received_seqcount
);
1187 stream
->index_received_seqcount
++;
1188 LTTNG_OPTIONAL_SET(&stream
->received_packet_seq_num
,
1189 be64toh(index
->index_data
.packet_seq_num
));
1191 } else if (ret
> 0) {
1192 index
->total_size
= total_size
;
1199 * relay_index_try_flush is responsible for the self-reference
1200 * put of the index object on error.
1202 ERR("relay_index_try_flush error %d", ret
);
1209 int stream_complete_packet(struct relay_stream
*stream
, size_t packet_total_size
,
1210 uint64_t sequence_number
, bool index_flushed
)
1214 ASSERT_LOCKED(stream
->lock
);
1216 stream
->tracefile_size_current
+= packet_total_size
;
1217 if (index_flushed
) {
1218 stream
->pos_after_last_complete_data_index
=
1219 stream
->tracefile_size_current
;
1220 stream
->prev_index_seq
= sequence_number
;
1221 ret
= try_rotate_stream_index(stream
);
1227 stream
->prev_data_seq
= sequence_number
;
1228 ret
= try_rotate_stream_data(stream
);
1234 int stream_add_index(struct relay_stream
*stream
,
1235 const struct lttcomm_relayd_index
*index_info
)
1238 struct relay_index
*index
;
1240 ASSERT_LOCKED(stream
->lock
);
1242 DBG("stream_add_index for stream %" PRIu64
, stream
->stream_handle
);
1244 /* Live beacon handling */
1245 if (index_info
->packet_size
== 0) {
1246 DBG("Received live beacon for stream %" PRIu64
,
1247 stream
->stream_handle
);
1250 * Only flag a stream inactive when it has already
1251 * received data and no indexes are in flight.
1253 if (stream
->index_received_seqcount
> 0
1254 && stream
->indexes_in_flight
== 0) {
1255 stream
->beacon_ts_end
= index_info
->timestamp_end
;
1260 stream
->beacon_ts_end
= -1ULL;
1263 if (stream
->ctf_stream_id
== -1ULL) {
1264 stream
->ctf_stream_id
= index_info
->stream_id
;
1267 index
= relay_index_get_by_id_or_create(stream
, index_info
->net_seq_num
);
1270 ERR("Failed to get or create index %" PRIu64
,
1271 index_info
->net_seq_num
);
1274 if (relay_index_set_control_data(index
, index_info
,
1275 stream
->trace
->session
->minor
)) {
1276 ERR("set_index_control_data error");
1277 relay_index_put(index
);
1281 ret
= relay_index_try_flush(index
);
1283 tracefile_array_file_rotate(stream
->tfa
, TRACEFILE_ROTATE_READ
);
1284 tracefile_array_commit_seq(stream
->tfa
, stream
->index_received_seqcount
);
1285 stream
->index_received_seqcount
++;
1286 stream
->pos_after_last_complete_data_index
+= index
->total_size
;
1287 stream
->prev_index_seq
= index_info
->net_seq_num
;
1288 LTTNG_OPTIONAL_SET(&stream
->received_packet_seq_num
,
1289 index_info
->packet_seq_num
);
1291 ret
= try_rotate_stream_index(stream
);
1295 ret
= try_rotate_stream_data(stream
);
1299 } else if (ret
> 0) {
1306 * relay_index_try_flush is responsible for the self-reference
1307 * put of the index object on error.
1309 ERR("relay_index_try_flush error %d", ret
);
1316 static void print_stream_indexes(struct relay_stream
*stream
)
1318 struct lttng_ht_iter iter
;
1319 struct relay_index
*index
;
1322 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
, index
,
1324 DBG("index %p net_seq_num %" PRIu64
" refcount %ld"
1325 " stream %" PRIu64
" trace %" PRIu64
1326 " session %" PRIu64
,
1329 stream
->ref
.refcount
,
1330 index
->stream
->stream_handle
,
1331 index
->stream
->trace
->id
,
1332 index
->stream
->trace
->session
->id
);
1337 int stream_reset_file(struct relay_stream
*stream
)
1339 ASSERT_LOCKED(stream
->lock
);
1344 ret
= fs_handle_close(stream
->file
);
1346 ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64
,
1347 stream
->channel_name
,
1348 stream
->stream_handle
);
1350 stream
->file
= NULL
;
1353 DBG("%s: reset tracefile_size_current for stream %" PRIu64
" was %" PRIu64
,
1354 __func__
, stream
->stream_handle
, stream
->tracefile_size_current
);
1355 stream
->tracefile_size_current
= 0;
1356 stream
->prev_data_seq
= 0;
1357 stream
->prev_index_seq
= 0;
1358 /* Note that this does not reset the tracefile array. */
1359 stream
->tracefile_current_index
= 0;
1360 stream
->pos_after_last_complete_data_index
= 0;
1362 return stream_create_data_output_file_from_trace_chunk(stream
,
1363 stream
->trace_chunk
, true, &stream
->file
);
1366 void print_relay_streams(void)
1368 struct lttng_ht_iter iter
;
1369 struct relay_stream
*stream
;
1371 if (!relay_streams_ht
) {
1376 cds_lfht_for_each_entry(relay_streams_ht
->ht
, &iter
.iter
, stream
,
1378 if (!stream_get(stream
)) {
1381 DBG("stream %p refcount %ld stream %" PRIu64
" trace %" PRIu64
1382 " session %" PRIu64
,
1384 stream
->ref
.refcount
,
1385 stream
->stream_handle
,
1387 stream
->trace
->session
->id
);
1388 print_stream_indexes(stream
);