#include <common/consumer/consumer-stream.h>
#include <common/consumer/consumer-testpoint.h>
#include <common/align.h>
+#include <common/consumer/consumer-metadata-cache.h>
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
stream->session_id = session_id;
stream->monitor = monitor;
stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
- stream->index_fd = -1;
+ stream->index_file = NULL;
stream->last_sequence_number = -1ULL;
pthread_mutex_init(&stream->lock, NULL);
pthread_mutex_init(&stream->metadata_timer_lock, NULL);
CDS_INIT_LIST_HEAD(&channel->streams.head);
- DBG("Allocated channel (key %" PRIu64 ")", channel->key)
+ DBG("Allocated channel (key %" PRIu64 ")", channel->key);
end:
return channel;
DBG("Consumer flag that it should quit");
}
+
+/*
+ * Flush pending writes to trace output disk file.
+ */
+static
void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
off_t orig_offset)
{
ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size,
stream->max_sb_size, POSIX_FADV_DONTNEED);
if (ret && ret != -ENOSYS) {
- errno = -ret;
- PERROR("posix_fadvise");
+ errno = ret;
+ PERROR("posix_fadvise on fd %i", outfd);
}
}
}
outfd = stream->out_fd;
- if (stream->index_fd >= 0) {
- ret = close(stream->index_fd);
- if (ret < 0) {
- PERROR("Closing index");
- goto end;
- }
- stream->index_fd = -1;
- ret = index_create_file(stream->chan->pathname,
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!stream->index_file) {
goto end;
}
- stream->index_fd = ret;
}
/* Reset current size because we just perform a rotation. */
lttng_sync_file_range(outfd, stream->out_fd_offset, len,
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += len;
+ lttng_consumer_sync_trace_file(stream, orig_offset);
}
- lttng_consumer_sync_trace_file(stream, orig_offset);
write_error:
/*
}
outfd = stream->out_fd;
- if (stream->index_fd >= 0) {
- ret = close(stream->index_fd);
- if (ret < 0) {
- PERROR("Closing index");
- goto end;
- }
- stream->index_fd = -1;
- ret = index_create_file(stream->chan->pathname,
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- written = ret;
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!stream->index_file) {
goto end;
}
- stream->index_fd = ret;
}
/* Reset current size because we just perform a rotation. */
stream->output_written += ret_splice;
written += ret_splice;
}
- lttng_consumer_sync_trace_file(stream, orig_offset);
+ if (!relayd) {
+ lttng_consumer_sync_trace_file(stream, orig_offset);
+ }
goto end;
write_error:
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->metadata_cache->lock);
pthread_mutex_lock(&stream->lock);
/* Remove any reference to that stream. */
stream->chan->metadata_stream = NULL;
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
- * a negative len, it means an error occured thus we
+ * a negative len, it means an error occurred thus we
* simply remove it from the poll set and free the
* stream.
*/
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
- * a negative len, it means an error occured thus we
+ * a negative len, it means an error occurred thus we
* simply remove it from the poll set and free the
* stream.
*/