-end_no_session:
- return ret;
-}
-
-/*
- * Open index file using a given viewer stream.
- *
- * Return 0 on success or else a negative value.
- */
-static int open_index(struct relay_viewer_stream *stream)
-{
- int ret;
- char fullpath[PATH_MAX];
- struct ctf_packet_index_file_hdr hdr;
-
- if (stream->tracefile_count > 0) {
- ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s_%"
- PRIu64 DEFAULT_INDEX_FILE_SUFFIX, stream->path_name,
- stream->channel_name, stream->tracefile_count_current);
- } else {
- ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s"
- DEFAULT_INDEX_FILE_SUFFIX, stream->path_name,
- stream->channel_name);
- }
- if (ret < 0) {
- PERROR("snprintf index path");
- goto error;
- }
-
- DBG("Opening index file %s in read only", fullpath);
- ret = open(fullpath, O_RDONLY);
- if (ret < 0) {
- if (errno == ENOENT) {
- ret = -ENOENT;
- goto error;
- } else {
- PERROR("opening index in read-only");
- }
- goto error;
- }
- stream->index_read_fd = ret;
- DBG("Opening index file %s in read only, (fd: %d)", fullpath, ret);
-
- ret = lttng_read(stream->index_read_fd, &hdr, sizeof(hdr));
- if (ret < sizeof(hdr)) {
- PERROR("Reading index header");
- goto error;
- }
- if (be32toh(hdr.magic) != CTF_INDEX_MAGIC) {
- ERR("Invalid header magic");
- ret = -1;
- goto error;
- }
- if (be32toh(hdr.index_major) != CTF_INDEX_MAJOR ||
- be32toh(hdr.index_minor) != CTF_INDEX_MINOR) {
- ERR("Invalid header version");
- ret = -1;
- goto error;
- }
- ret = 0;
-
-error:
- return ret;
-}
-
-/*
- * Allocate and init a new viewer_stream.
- *
- * Copies the values from the stream passed in parameter and insert the new
- * stream in the viewer_streams_ht.
- *
- * MUST be called with rcu_read_lock held.
- *
- * Returns 0 on success or a negative value on error.
- */
-static
-int init_viewer_stream(struct relay_stream *stream, int seek_last)
-{
- int ret;
- struct relay_viewer_stream *viewer_stream;
-
- assert(stream);
-
- viewer_stream = zmalloc(sizeof(*viewer_stream));
- if (!viewer_stream) {
- PERROR("relay viewer stream zmalloc");
- ret = -1;
- goto error;
- }
- viewer_stream->session_id = stream->session->id;
- viewer_stream->stream_handle = stream->stream_handle;
- viewer_stream->path_name = strndup(stream->path_name,
- LTTNG_VIEWER_PATH_MAX);
- viewer_stream->channel_name = strndup(stream->channel_name,
- LTTNG_VIEWER_NAME_MAX);
- viewer_stream->tracefile_count = stream->tracefile_count;
- viewer_stream->metadata_flag = stream->metadata_flag;
- viewer_stream->tracefile_count_last = -1ULL;
- if (seek_last) {
- viewer_stream->tracefile_count_current =
- stream->tracefile_count_current;
- } else {
- viewer_stream->tracefile_count_current =
- stream->oldest_tracefile_id;
- }
-
- viewer_stream->ctf_trace = stream->ctf_trace;
- if (viewer_stream->metadata_flag) {
- viewer_stream->ctf_trace->viewer_metadata_stream =
- viewer_stream;
- }
- uatomic_inc(&viewer_stream->ctf_trace->refcount);
-
- lttng_ht_node_init_u64(&viewer_stream->stream_n, stream->stream_handle);
- lttng_ht_add_unique_u64(viewer_streams_ht, &viewer_stream->stream_n);
-
- viewer_stream->index_read_fd = -1;
- viewer_stream->read_fd = -1;
-
- /*
- * This is to avoid a race between the initialization of this object and
- * the close of the given stream. If the stream is unable to find this
- * viewer stream when closing, this copy will at least take the latest
- * value.
- * We also need that for the seek_last.
- */
- viewer_stream->total_index_received = stream->total_index_received;
-
- /*
- * If we never received an index for the current stream, delay
- * the opening of the index, otherwise open it right now.
- */
- if (viewer_stream->tracefile_count_current ==
- stream->tracefile_count_current &&
- viewer_stream->total_index_received == 0) {
- viewer_stream->index_read_fd = -1;
- } else {
- ret = open_index(viewer_stream);
- if (ret < 0) {
- goto error;
- }
- }
-
- if (seek_last && viewer_stream->index_read_fd > 0) {
- ret = lseek(viewer_stream->index_read_fd,
- viewer_stream->total_index_received *
- sizeof(struct ctf_packet_index),
- SEEK_CUR);
- if (ret < 0) {
- goto error;
- }
- viewer_stream->last_sent_index =
- viewer_stream->total_index_received;
- }
-
- ret = 0;
-
-error:
- return ret;
-}
-
-/*
- * Rotate a stream to the next tracefile.
- *
- * Returns 0 on success, 1 on EOF, a negative value on error.
- */
-static
-int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream,
- struct relay_stream *stream)
-{
- int ret;
- uint64_t tracefile_id;
-
- assert(viewer_stream);
-
- tracefile_id = (viewer_stream->tracefile_count_current + 1) %
- viewer_stream->tracefile_count;
- /*
- * Detect the last tracefile to open.
- */
- if (viewer_stream->tracefile_count_last != -1ULL &&
- viewer_stream->tracefile_count_last ==
- viewer_stream->tracefile_count_current) {
- ret = 1;
- goto end;
- }
-
- if (stream) {
- pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
- }
- /*
- * The writer and the reader are not working in the same
- * tracefile, we can read up to EOF, we don't care about the
- * total_index_received.
- */
- if (!stream || (stream->tracefile_count_current != tracefile_id)) {
- viewer_stream->close_write_flag = 1;
- } else {
- /*
- * We are opening a file that is still open in write, make
- * sure we limit our reading to the number of indexes
- * received.
- */
- viewer_stream->close_write_flag = 0;
- if (stream) {
- viewer_stream->total_index_received =
- stream->total_index_received;
- }
- }
- viewer_stream->tracefile_count_current = tracefile_id;
-
- ret = close(viewer_stream->index_read_fd);
- if (ret < 0) {
- PERROR("close index file %d",
- viewer_stream->index_read_fd);
- }
- viewer_stream->index_read_fd = -1;
- ret = close(viewer_stream->read_fd);
- if (ret < 0) {
- PERROR("close tracefile %d",
- viewer_stream->read_fd);
- }
- viewer_stream->read_fd = -1;
-
- pthread_mutex_lock(&viewer_stream->overwrite_lock);
- viewer_stream->abort_flag = 0;
- pthread_mutex_unlock(&viewer_stream->overwrite_lock);
-
- viewer_stream->index_read_fd = -1;
- viewer_stream->read_fd = -1;
-
- if (stream) {
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
- }
- ret = open_index(viewer_stream);
- if (ret < 0) {
- goto error;
- }
-
- ret = 0;
-
-end:
-error: