const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
ASSERT_LOCKED(stream->lock);
- assert(stream->trace_chunk);
ret = utils_stream_file_path(stream->path_name, stream->channel_name,
stream->tracefile_size, stream->tracefile_current_index,
struct stream_fd *previous_stream_fd = NULL;
struct lttng_trace_chunk *previous_chunk = NULL;
- if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
+ if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
ERR("Protocol error encoutered in %s(): stream rotation "
"sequence number is before the current sequence number "
"and the next trace chunk is unset. Honoring this "
stream_put(stream);
stream = NULL;
}
- lttng_trace_chunk_put(current_trace_chunk);
+ if (acquired_reference) {
+ lttng_trace_chunk_put(current_trace_chunk);
+ }
return stream;
error_no_alloc:
stream->closed = true;
/* Relay indexes are only used by the "consumer/sessiond" end. */
relay_index_close_all(stream);
+
+ /*
+ * If we are closed by an application exiting (per-pid buffers),
+ * we need to put our reference on the stream trace chunk right
+ * away, because otherwise still holding the reference on the
+ * trace chunk could allow a viewer stream (which holds a reference
+ * to the stream) to postpone destroy waiting for the chunk to cease
+ * to exist endlessly until the viewer is detached.
+ */
+
+ /* Put stream fd before put chunk. */
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
pthread_mutex_unlock(&stream->lock);
DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
stream_put(stream);
int ret = 0;
ASSERT_LOCKED(stream->lock);
+
+ if (!stream->stream_fd || !stream->trace_chunk) {
+ ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+ stream->stream_handle, stream->channel_name);
+ ret = -1;
+ goto end;
+ }
+
if (caa_likely(stream->tracefile_size == 0)) {
/* No size limit set; nothing to check. */
goto end;
stream->stream_handle,
stream->tracefile_size_current, packet_size,
stream->tracefile_current_index, new_file_index);
- tracefile_array_file_rotate(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
stream->tracefile_current_index = new_file_index;
if (stream->stream_fd) {
memset(padding_buffer, 0,
min(sizeof(padding_buffer), padding_to_write));
+ if (!stream->stream_fd || !stream->trace_chunk) {
+ ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+ stream->stream_handle, stream->channel_name);
+ ret = -1;
+ goto end;
+ }
if (packet) {
write_ret = lttng_write(stream->stream_fd->fd,
packet->data, packet->size);
uint64_t data_offset;
struct relay_index *index;
+ assert(stream->trace_chunk);
ASSERT_LOCKED(stream->lock);
/* Get data offset because we are about to update the index. */
data_offset = htobe64(stream->tracefile_size_current);
ret = relay_index_try_flush(index);
if (ret == 0) {
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
tracefile_array_commit_seq(stream->tfa);
stream->index_received_seqcount++;
*flushed = true;
stream->prev_data_seq = sequence_number;
ret = try_rotate_stream_data(stream);
- if (ret < 0) {
- goto end;
- }
+
end:
return ret;
}
}
ret = relay_index_try_flush(index);
if (ret == 0) {
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
tracefile_array_commit_seq(stream->tfa);
stream->index_received_seqcount++;
stream->pos_after_last_complete_data_index += index->total_size;