+ ret = lttng_consumer_get_consumed_snapshot(stream,
+ &consumed_pos);
+ if (ret < 0) {
+ ERR("Failed to sample consumed position during channel rotation");
+ goto end_unlock_stream;
+ }
+ }
+ /*
+ * Align produced position on the start-of-packet boundary of the first
+ * packet going into the next trace chunk.
+ */
+ produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
+ if (consumed_pos == produced_pos) {
+ DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key, produced_pos, consumed_pos);
+ stream->rotate_ready = true;
+ } else {
+ DBG("Different consumed and produced positions "
+ "for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key, produced_pos, consumed_pos);
+ }
+ /*
+ * The rotation position is based on the packet_seq_num of the
+ * packet following the last packet that was consumed for this
+ * stream, incremented by the offset between produced and
+ * consumed positions. This rotation position is a lower bound
+ * (inclusive) at which the next trace chunk starts. Since it
+ * is a lower bound, it is OK if the packet_seq_num does not
+ * correspond exactly to the same packet identified by the
+ * consumed_pos, which can happen in overwrite mode.
+ */
+ if (stream->sequence_number_unavailable) {
+ /*
+ * Rotation should never be performed on a session which
+ * interacts with a pre-2.8 lttng-modules, which does
+ * not implement packet sequence number.
+ */
+ ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
+ stream->key);
+ ret = -1;
+ goto end_unlock_stream;
+ }
+ stream->rotate_position = stream->last_sequence_number + 1 +
+ ((produced_pos - consumed_pos) / stream->max_sb_size);
+ DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
+ stream->key, stream->rotate_position);
+
+ if (!is_local_trace) {
+ /*
+ * The relay daemon control protocol expects a rotation
+ * position as "the sequence number of the first packet
+ * _after_ the current trace chunk".
+ */
+ const struct relayd_stream_rotation_position position = {
+ .stream_id = stream->relayd_stream_id,
+ .rotate_at_seq_num = stream->rotate_position,
+ };
+
+ ret = lttng_dynamic_array_add_element(
+ &stream_rotation_positions,
+ &position);
+ if (ret) {
+ ERR("Failed to allocate stream rotation position");
+ goto end_unlock_stream;
+ }
+ stream_count++;
+ }
+
+ stream->opened_packet_in_current_trace_chunk = false;
+
+ if (rotating_to_new_chunk && !stream->metadata_flag) {
+ /*
+ * Attempt to flush an empty packet as close to the
+ * rotation point as possible. In the event where a
+ * stream remains inactive after the rotation point,
+ * this ensures that the new trace chunk has a
+ * beginning timestamp set at the begining of the
+ * trace chunk instead of only creating an empty
+ * packet when the trace chunk is stopped.
+ *
+ * This indicates to the viewers that the stream
+ * was being recorded, but more importantly it
+ * allows viewers to determine a useable trace
+ * intersection.
+ *
+ * This presents a problem in the case where the
+ * ring-buffer is completely full.
+ *
+ * Consider the following scenario:
+ * - The consumption of data is slow (slow network,
+ * for instance),
+ * - The ring buffer is full,
+ * - A rotation is initiated,
+ * - The flush below does nothing (no space left to
+ * open a new packet),
+ * - The other streams rotate very soon, and new
+ * data is produced in the new chunk,
+ * - This stream completes its rotation long after the
+ * rotation was initiated
+ * - The session is stopped before any event can be
+ * produced in this stream's buffers.
+ *
+ * The resulting trace chunk will have a single packet
+ * temporaly at the end of the trace chunk for this
+ * stream making the stream intersection more narrow
+ * than it should be.
+ *
+ * To work-around this, an empty flush is performed
+ * after the first consumption of a packet during a
+ * rotation if open_packet fails. The idea is that
+ * consuming a packet frees enough space to switch
+ * packets in this scenario and allows the tracer to
+ * "stamp" the beginning of the new trace chunk at the
+ * earliest possible point.
+ *
+ * The packet open is performed after the channel
+ * rotation to ensure that no attempt to open a packet
+ * is performed in a stream that has no active trace
+ * chunk.
+ */
+ ret = lttng_dynamic_pointer_array_add_pointer(
+ &streams_packet_to_open, stream);
+ if (ret) {
+ PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
+ ret = -1;
+ goto end_unlock_stream;
+ }
+ }
+
+ pthread_mutex_unlock(&stream->lock);
+ }
+ stream = NULL;
+
+ if (!is_local_trace) {
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, relayd_id);
+ ret = -1;
+ goto end_unlock_channel;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer
+ .data);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+ relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end_unlock_channel;
+ }
+ }
+
+ for (stream_idx = 0;
+ stream_idx < lttng_dynamic_pointer_array_get_count(
+ &streams_packet_to_open);
+ stream_idx++) {
+ enum open_packet_status status;
+
+ stream = lttng_dynamic_pointer_array_get_pointer(
+ &streams_packet_to_open, stream_idx);
+
+ pthread_mutex_lock(&stream->lock);
+ status = open_packet(stream);
+ pthread_mutex_unlock(&stream->lock);
+ switch (status) {
+ case OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case OPEN_PACKET_STATUS_NO_SPACE:
+ /*
+ * Can't open a packet as there is no space left
+ * in the buffer. A new packet will be opened
+ * once one has been consumed.
+ */
+ DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case OPEN_PACKET_STATUS_ERROR:
+ /* Logged by callee. */
+ ret = -1;
+ goto end_unlock_channel;
+ default:
+ abort();
+ }
+ }
+
+ pthread_mutex_unlock(&channel->lock);
+ ret = 0;
+ goto end;
+
+end_unlock_stream:
+ pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
+ pthread_mutex_unlock(&channel->lock);
+end:
+ rcu_read_unlock();
+ lttng_dynamic_array_reset(&stream_rotation_positions);
+ lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
+ return ret;
+}
+
+static
+int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+ unsigned long consumed_pos_before, consumed_pos_after;
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Taking snapshot positions");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
+ if (ret < 0) {
+ ERR("Consumed snapshot position");
+ goto end;
+ }
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ ret = kernctl_buffer_clear(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to clear kernel stream (ret = %d)", ret);
+ goto end;
+ }
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ lttng_ustconsumer_clear_buffer(stream);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Taking snapshot positions");
+ goto end;
+ }
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
+ if (ret < 0) {
+ ERR("Consumed snapshot position");
+ goto end;
+ }
+ DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
+end:
+ return ret;
+}
+
+static
+int consumer_clear_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ ret = consumer_flush_buffer(stream, 1);
+ if (ret < 0) {
+ ERR("Failed to flush stream %" PRIu64 " during channel clear",
+ stream->key);
+ ret = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+
+ ret = consumer_clear_buffer(stream);
+ if (ret < 0) {
+ ERR("Failed to clear stream %" PRIu64 " during channel clear",
+ stream->key);
+ ret = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+
+ ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+ return ret;
+}
+
+static
+int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+
+ rcu_read_lock();
+ pthread_mutex_lock(&channel->lock);
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ health_code_update();
+ pthread_mutex_lock(&stream->lock);
+ ret = consumer_clear_stream(stream);
+ if (ret) {
+ goto error_unlock;
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+ pthread_mutex_unlock(&channel->lock);
+ rcu_read_unlock();
+ return 0;
+
+error_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&channel->lock);
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Check if a stream is ready to be rotated after extracting it.
+ *
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error. Stream lock must be held.
+ */
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
+{
+ DBG("Check is rotate ready for stream %" PRIu64
+ " ready %u rotate_position %" PRIu64
+ " last_sequence_number %" PRIu64,
+ stream->key, stream->rotate_ready,
+ stream->rotate_position, stream->last_sequence_number);
+ if (stream->rotate_ready) {
+ return 1;
+ }
+
+ /*
+ * If packet seq num is unavailable, it means we are interacting
+ * with a pre-2.8 lttng-modules which does not implement the
+ * sequence number. Rotation should never be used by sessiond in this
+ * scenario.
+ */
+ if (stream->sequence_number_unavailable) {
+ ERR("Internal error: rotation used on stream %" PRIu64
+ " with unavailable sequence number",
+ stream->key);
+ return -1;
+ }
+
+ if (stream->rotate_position == -1ULL ||
+ stream->last_sequence_number == -1ULL) {
+ return 0;
+ }
+
+ /*
+ * Rotate position not reached yet. The stream rotate position is
+ * the position of the next packet belonging to the next trace chunk,
+ * but consumerd considers rotation ready when reaching the last
+ * packet of the current chunk, hence the "rotate_position - 1".
+ */
+
+ DBG("Check is rotate ready for stream %" PRIu64
+ " last_sequence_number %" PRIu64
+ " rotate_position %" PRIu64,
+ stream->key, stream->last_sequence_number,
+ stream->rotate_position);
+ if (stream->last_sequence_number >= stream->rotate_position - 1) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/*
+ * Reset the state for a stream after a rotation occurred.
+ */
+void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
+{
+ DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64,
+ stream->key);
+ stream->rotate_position = -1ULL;
+ stream->rotate_ready = false;
+}
+
+/*
+ * Perform the rotation a local stream file.
+ */
+static
+int rotate_local_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+
+ DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
+ stream->key,
+ stream->chan->key);
+ stream->tracefile_size_current = 0;
+ stream->tracefile_count_current = 0;
+
+ if (stream->out_fd >= 0) {
+ ret = close(stream->out_fd);
+ if (ret) {
+ PERROR("Failed to close stream out_fd of channel \"%s\"",
+ stream->chan->name);
+ }
+ stream->out_fd = -1;
+ }
+
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+
+ if (!stream->trace_chunk) {
+ goto end;
+ }
+
+ ret = consumer_stream_create_output_files(stream, true);
+end:
+ return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the channel and stream locks held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ DBG("Consumer rotate stream %" PRIu64, stream->key);
+
+ /*
+ * Update the stream's 'current' chunk to the session's (channel)
+ * now-current chunk.
+ */
+ lttng_trace_chunk_put(stream->trace_chunk);
+ if (stream->chan->trace_chunk == stream->trace_chunk) {
+ /*
+ * A channel can be rotated and not have a "next" chunk
+ * to transition to. In that case, the channel's "current chunk"
+ * has not been closed yet, but it has not been updated to
+ * a "next" trace chunk either. Hence, the stream, like its
+ * parent channel, becomes part of no chunk and can't output
+ * anything until a new trace chunk is created.
+ */
+ stream->trace_chunk = NULL;
+ } else if (stream->chan->trace_chunk &&
+ !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
+ ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");