+/* Stream lock must be held by the caller. */
+static int sample_stream_positions(struct lttng_consumer_stream *stream,
+ unsigned long *produced, unsigned long *consumed)
+{
+ int ret;
+
+ ASSERT_LOCKED(stream->lock);
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to sample snapshot positions");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(stream, produced);
+ if (ret < 0) {
+ ERR("Failed to sample produced position");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
+ if (ret < 0) {
+ ERR("Failed to sample consumed position");
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Sample the rotate position for all the streams of a channel. If a stream
+ * is already at the rotate position (produced == consumed), we flag it as
+ * ready for rotation. The rotation of ready streams occurs after we have
+ * replied to the session daemon that we have finished sampling the positions.
+ * Must be called with RCU read-side lock held to ensure existence of channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+ uint64_t key, uint64_t relayd_id, uint32_t metadata,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ struct lttng_dynamic_array stream_rotation_positions;
+ uint64_t next_chunk_id, stream_count = 0;
+ enum lttng_trace_chunk_status chunk_status;
+ const bool is_local_trace = relayd_id == -1ULL;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool rotating_to_new_chunk = true;
+ /* Array of `struct lttng_consumer_stream *` */
+ struct lttng_dynamic_pointer_array streams_packet_to_open;
+ size_t stream_idx;
+
+ DBG("Consumer sample rotate position for channel %" PRIu64, key);
+
+ lttng_dynamic_array_init(&stream_rotation_positions,
+ sizeof(struct relayd_stream_rotation_position), NULL);
+ lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
+
+ rcu_read_lock();
+
+ pthread_mutex_lock(&channel->lock);
+ assert(channel->trace_chunk);
+ chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
+ &next_chunk_id);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end_unlock_channel;
+ }
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ unsigned long produced_pos = 0, consumed_pos = 0;
+
+ health_code_update();
+
+ /*
+ * Lock stream because we are about to change its state.
+ */
+ pthread_mutex_lock(&stream->lock);
+
+ if (stream->trace_chunk == stream->chan->trace_chunk) {
+ rotating_to_new_chunk = false;
+ }
+
+ /*
+ * Do not flush a packet when rotating from a NULL trace
+ * chunk. The stream has no means to output data, and the prior
+ * rotation which rotated to NULL performed that side-effect
+ * already. No new data can be produced when a stream has no
+ * associated trace chunk (e.g. a stop followed by a rotate).
+ */
+ if (stream->trace_chunk) {
+ bool flush_active;
+
+ if (stream->metadata_flag) {
+ /*
+ * Don't produce an empty metadata packet,
+ * simply close the current one.
+ *
+ * Metadata is regenerated on every trace chunk
+ * switch; there is no concern that no data was
+ * produced.
+ */
+ flush_active = true;
+ } else {
+ /*
+ * Only flush an empty packet if the "packet
+ * open" could not be performed on transition
+ * to a new trace chunk and no packets were
+ * consumed within the chunk's lifetime.
+ */
+ if (stream->opened_packet_in_current_trace_chunk) {
+ flush_active = true;
+ } else {
+ /*
+ * Stream could have been full at the
+ * time of rotation, but then have had
+ * no activity at all.
+ *
+ * It is important to flush a packet
+ * to prevent 0-length files from being
+ * produced as most viewers choke on
+ * them.
+ *
+ * Unfortunately viewers will not be
+ * able to know that tracing was active
+ * for this stream during this trace
+ * chunk's lifetime.
+ */
+ ret = sample_stream_positions(stream, &produced_pos, &consumed_pos);
+ if (ret) {
+ goto end_unlock_stream;
+ }
+
+ /*
+ * Don't flush an empty packet if data
+ * was produced; it will be consumed
+ * before the rotation completes.
+ */
+ flush_active = produced_pos != consumed_pos;
+ if (!flush_active) {
+ enum lttng_trace_chunk_status chunk_status;
+ const char *trace_chunk_name;
+ uint64_t trace_chunk_id;
+
+ chunk_status = lttng_trace_chunk_get_name(
+ stream->trace_chunk,
+ &trace_chunk_name,
+ NULL);
+ if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
+ trace_chunk_name = "none";
+ }
+
+ /*
+ * Consumer trace chunks are
+ * never anonymous.
+ */
+ chunk_status = lttng_trace_chunk_get_id(
+ stream->trace_chunk,
+ &trace_chunk_id);
+ assert(chunk_status ==
+ LTTNG_TRACE_CHUNK_STATUS_OK);
+
+ DBG("Unable to open packet for stream during trace chunk's lifetime. "
+ "Flushing an empty packet to prevent an empty file from being created: "
+ "stream id = %" PRIu64 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
+ stream->key, trace_chunk_name, trace_chunk_id);
+ }
+ }
+ }
+
+ /*
+ * Close the current packet before sampling the
+ * ring buffer positions.
+ */
+ ret = consumer_flush_buffer(stream, flush_active);
+ if (ret < 0) {
+ ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+ stream->key);
+ goto end_unlock_stream;
+ }
+ }
+
+ ret = lttng_consumer_take_snapshot(stream);
+ if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
+ ERR("Failed to sample snapshot position during channel rotation");
+ goto end_unlock_stream;
+ }
+ if (!ret) {
+ ret = lttng_consumer_get_produced_snapshot(stream,
+ &produced_pos);
+ if (ret < 0) {
+ ERR("Failed to sample produced position during channel rotation");
+ goto end_unlock_stream;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream,
+ &consumed_pos);
+ if (ret < 0) {
+ ERR("Failed to sample consumed position during channel rotation");
+ goto end_unlock_stream;
+ }
+ }
+ /*
+ * Align produced position on the start-of-packet boundary of the first
+ * packet going into the next trace chunk.
+ */
+ produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
+ if (consumed_pos == produced_pos) {
+ DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key, produced_pos, consumed_pos);
+ stream->rotate_ready = true;
+ } else {
+ DBG("Different consumed and produced positions "
+ "for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key, produced_pos, consumed_pos);
+ }
+ /*
+ * The rotation position is based on the packet_seq_num of the
+ * packet following the last packet that was consumed for this
+ * stream, incremented by the offset between produced and
+ * consumed positions. This rotation position is a lower bound
+ * (inclusive) at which the next trace chunk starts. Since it
+ * is a lower bound, it is OK if the packet_seq_num does not
+ * correspond exactly to the same packet identified by the
+ * consumed_pos, which can happen in overwrite mode.
+ */
+ if (stream->sequence_number_unavailable) {
+ /*
+ * Rotation should never be performed on a session which
+ * interacts with a pre-2.8 lttng-modules, which does
+ * not implement packet sequence number.
+ */
+ ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
+ stream->key);
+ ret = -1;
+ goto end_unlock_stream;
+ }
+ stream->rotate_position = stream->last_sequence_number + 1 +
+ ((produced_pos - consumed_pos) / stream->max_sb_size);
+ DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
+ stream->key, stream->rotate_position);
+
+ if (!is_local_trace) {
+ /*
+ * The relay daemon control protocol expects a rotation
+ * position as "the sequence number of the first packet
+ * _after_ the current trace chunk".
+ */
+ const struct relayd_stream_rotation_position position = {
+ .stream_id = stream->relayd_stream_id,
+ .rotate_at_seq_num = stream->rotate_position,
+ };
+
+ ret = lttng_dynamic_array_add_element(
+ &stream_rotation_positions,
+ &position);
+ if (ret) {
+ ERR("Failed to allocate stream rotation position");
+ goto end_unlock_stream;
+ }
+ stream_count++;
+ }
+
+ stream->opened_packet_in_current_trace_chunk = false;
+
+ if (rotating_to_new_chunk && !stream->metadata_flag) {
+ /*
+ * Attempt to flush an empty packet as close to the
+ * rotation point as possible. In the event where a
+ * stream remains inactive after the rotation point,
+ * this ensures that the new trace chunk has a
+ * beginning timestamp set at the begining of the
+ * trace chunk instead of only creating an empty
+ * packet when the trace chunk is stopped.
+ *
+ * This indicates to the viewers that the stream
+ * was being recorded, but more importantly it
+ * allows viewers to determine a useable trace
+ * intersection.
+ *
+ * This presents a problem in the case where the
+ * ring-buffer is completely full.
+ *
+ * Consider the following scenario:
+ * - The consumption of data is slow (slow network,
+ * for instance),
+ * - The ring buffer is full,
+ * - A rotation is initiated,
+ * - The flush below does nothing (no space left to
+ * open a new packet),
+ * - The other streams rotate very soon, and new
+ * data is produced in the new chunk,
+ * - This stream completes its rotation long after the
+ * rotation was initiated
+ * - The session is stopped before any event can be
+ * produced in this stream's buffers.
+ *
+ * The resulting trace chunk will have a single packet
+ * temporaly at the end of the trace chunk for this
+ * stream making the stream intersection more narrow
+ * than it should be.
+ *
+ * To work-around this, an empty flush is performed
+ * after the first consumption of a packet during a
+ * rotation if open_packet fails. The idea is that
+ * consuming a packet frees enough space to switch
+ * packets in this scenario and allows the tracer to
+ * "stamp" the beginning of the new trace chunk at the
+ * earliest possible point.
+ *
+ * The packet open is performed after the channel
+ * rotation to ensure that no attempt to open a packet
+ * is performed in a stream that has no active trace
+ * chunk.
+ */
+ ret = lttng_dynamic_pointer_array_add_pointer(
+ &streams_packet_to_open, stream);
+ if (ret) {
+ PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
+ ret = -1;
+ goto end_unlock_stream;
+ }
+ }
+
+ pthread_mutex_unlock(&stream->lock);
+ }
+ stream = NULL;
+
+ if (!is_local_trace) {
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, relayd_id);
+ ret = -1;
+ goto end_unlock_channel;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer
+ .data);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+ relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end_unlock_channel;
+ }
+ }
+
+ for (stream_idx = 0;
+ stream_idx < lttng_dynamic_pointer_array_get_count(
+ &streams_packet_to_open);
+ stream_idx++) {
+ enum open_packet_status status;
+
+ stream = lttng_dynamic_pointer_array_get_pointer(
+ &streams_packet_to_open, stream_idx);
+
+ pthread_mutex_lock(&stream->lock);
+ status = open_packet(stream);
+ pthread_mutex_unlock(&stream->lock);
+ switch (status) {
+ case OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case OPEN_PACKET_STATUS_NO_SPACE:
+ /*
+ * Can't open a packet as there is no space left
+ * in the buffer. A new packet will be opened
+ * once one has been consumed.
+ */
+ DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case OPEN_PACKET_STATUS_ERROR:
+ /* Logged by callee. */
+ ret = -1;
+ goto end_unlock_channel;
+ default:
+ abort();
+ }
+ }
+
+ pthread_mutex_unlock(&channel->lock);
+ ret = 0;
+ goto end;
+
+end_unlock_stream:
+ pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
+ pthread_mutex_unlock(&channel->lock);
+end:
+ rcu_read_unlock();
+ lttng_dynamic_array_reset(&stream_rotation_positions);
+ lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
+ return ret;
+}
+