#include <common/consumer/consumer-stream.h>
#include <common/consumer/consumer-testpoint.h>
#include <common/align.h>
+#include <common/consumer/consumer-metadata-cache.h>
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
stream->session_id = session_id;
stream->monitor = monitor;
stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
- stream->index_fd = -1;
+ stream->index_file = NULL;
stream->last_sequence_number = -1ULL;
pthread_mutex_init(&stream->lock, NULL);
pthread_mutex_init(&stream->metadata_timer_lock, NULL);
}
outfd = stream->out_fd;
- if (stream->index_fd >= 0) {
- ret = close(stream->index_fd);
- if (ret < 0) {
- PERROR("Closing index");
- goto end;
- }
- stream->index_fd = -1;
- ret = index_create_file(stream->chan->pathname,
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!stream->index_file) {
goto end;
}
- stream->index_fd = ret;
}
/* Reset current size because we just perform a rotation. */
}
outfd = stream->out_fd;
- if (stream->index_fd >= 0) {
- ret = close(stream->index_fd);
- if (ret < 0) {
- PERROR("Closing index");
- goto end;
- }
- stream->index_fd = -1;
- ret = index_create_file(stream->chan->pathname,
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- written = ret;
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!stream->index_file) {
goto end;
}
- stream->index_fd = ret;
}
/* Reset current size because we just perform a rotation. */
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
+ if (stream->chan->metadata_cache) {
+ /* Only applicable to userspace consumers. */
+ pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+ }
/* Remove any reference to that stream. */
consumer_stream_delete(stream, ht);
*/
stream->chan->metadata_stream = NULL;
+ if (stream->chan->metadata_cache) {
+ pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+ }
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);