#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/utils.h>
+#include <common/time.h>
#include <common/compat/poll.h>
#include <common/compat/endian.h>
#include <common/index/index.h>
#include <common/consumer/consumer-stream.h>
#include <common/consumer/consumer-testpoint.h>
#include <common/align.h>
+#include <common/consumer/consumer-metadata-cache.h>
+#include <common/trace-chunk.h>
+#include <common/trace-chunk-registry.h>
+#include <common/string-utils/format.h>
+#include <common/dynamic-array.h>
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
uint64_t key; /* del */
};
+/* Flag used to temporarily pause data consumption from testpoints. */
+int data_consumption_paused;
+
/*
* Flag to inform the polling thread to quit when all fd hung up. Updated by
* the consumer_thread_receive_fds when it notices that all fds has hung up.
* Also updated by the signal handler (consumer_should_exit()). Read by the
* polling threads.
*/
-volatile int consumer_quit;
+int consumer_quit;
/*
* Global hash table containing respectively metadata and data streams. The
(void) relayd_close(&relayd->control_sock);
(void) relayd_close(&relayd->data_sock);
+ pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
free(relayd);
}
*/
void consumer_del_channel(struct lttng_consumer_channel *channel)
{
- int ret;
struct lttng_ht_iter iter;
DBG("Consumer delete channel key %" PRIu64, channel->key);
if (channel->live_timer_enabled == 1) {
consumer_timer_live_stop(channel);
}
+ if (channel->monitor_timer_enabled == 1) {
+ consumer_timer_monitor_stop(channel);
+ }
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
goto end;
}
- rcu_read_lock();
- iter.iter.node = &channel->node.node;
- ret = lttng_ht_del(consumer_data.channel_ht, &iter);
- assert(!ret);
- rcu_read_unlock();
+ lttng_trace_chunk_put(channel->trace_chunk);
+ channel->trace_chunk = NULL;
+
+ if (channel->is_published) {
+ int ret;
+
+ rcu_read_lock();
+ iter.iter.node = &channel->node.node;
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
+
+ iter.iter.node = &channel->channels_by_session_id_ht_node.node;
+ ret = lttng_ht_del(consumer_data.channels_by_session_id_ht,
+ &iter);
+ assert(!ret);
+ rcu_read_unlock();
+ }
+ channel->is_deleted = true;
call_rcu(&channel->node.head, free_channel_rcu);
end:
pthread_mutex_unlock(&channel->lock);
* If a local data context is available, notify the threads that the streams'
* state have changed.
*/
-static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
- struct lttng_consumer_local_data *ctx)
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
{
uint64_t netidx;
assert(relayd);
- DBG("Cleaning up relayd sockets");
+ DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
/* Save the net sequence index before destroying the object */
netidx = relayd->net_seq_idx;
* memory barrier ordering the updates of the end point status from the
* read of this status which happens AFTER receiving this notify.
*/
- if (ctx) {
- notify_thread_lttng_pipe(ctx->consumer_data_pipe);
- notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
- }
+ notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
+ notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
}
/*
consumer_stream_destroy(stream, metadata_ht);
}
+void consumer_stream_update_channel_attributes(
+ struct lttng_consumer_stream *stream,
+ struct lttng_consumer_channel *channel)
+{
+ stream->channel_read_only_attributes.tracefile_size =
+ channel->tracefile_size;
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
- enum lttng_consumer_stream_state state,
const char *channel_name,
- uid_t uid,
- gid_t gid,
uint64_t relayd_id,
uint64_t session_id,
+ struct lttng_trace_chunk *trace_chunk,
int cpu,
int *alloc_ret,
enum consumer_channel_type type,
goto end;
}
- rcu_read_lock();
+ if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
+ ERR("Failed to acquire trace chunk reference during the creation of a stream");
+ ret = -1;
+ goto error;
+ }
+ rcu_read_lock();
stream->key = stream_key;
+ stream->trace_chunk = trace_chunk;
stream->out_fd = -1;
stream->out_fd_offset = 0;
stream->output_written = 0;
- stream->state = state;
- stream->uid = uid;
- stream->gid = gid;
stream->net_seq_idx = relayd_id;
stream->session_id = session_id;
stream->monitor = monitor;
stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
- stream->index_fd = -1;
+ stream->index_file = NULL;
stream->last_sequence_number = -1ULL;
+ stream->rotate_position = -1ULL;
pthread_mutex_init(&stream->lock, NULL);
pthread_mutex_init(&stream->metadata_timer_lock, NULL);
error:
rcu_read_unlock();
+ lttng_trace_chunk_put(stream->trace_chunk);
free(stream);
end:
if (alloc_ret) {
/*
* Add a stream to the global list protected by a mutex.
*/
-int consumer_add_data_stream(struct lttng_consumer_stream *stream)
+void consumer_add_data_stream(struct lttng_consumer_stream *stream)
{
struct lttng_ht *ht = data_ht;
- int ret = 0;
assert(stream);
assert(ht);
pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
-
- return ret;
}
void consumer_del_data_stream(struct lttng_consumer_stream *stream)
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_add_stream(&relayd->control_sock, stream->name,
path, &stream->relayd_stream_id,
- stream->chan->tracefile_size, stream->chan->tracefile_count);
+ stream->chan->tracefile_size,
+ stream->chan->tracefile_count,
+ stream->trace_chunk);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
+ ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
goto end;
}
ret = relayd_streams_sent(&relayd->control_sock);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
+ ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
goto end;
}
} else {
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
data_hdr.padding_size = htobe32(padding);
+
/*
* Note that net_seq_num below is assigned with the *current* value of
* next_net_seq_num and only after that the next_net_seq_num will be
return outfd;
}
+/*
+ * Trigger a dump of the metadata content. Following/during the succesful
+ * completion of this call, the metadata poll thread will start receiving
+ * metadata packets to consume.
+ *
+ * The caller must hold the channel and stream locks.
+ */
+static
+int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ ASSERT_LOCKED(stream->chan->lock);
+ ASSERT_LOCKED(stream->lock);
+ assert(stream->metadata_flag);
+ assert(stream->chan->trace_chunk);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ /*
+ * Reset the position of what has been read from the
+ * metadata cache to 0 so we can dump it again.
+ */
+ ret = kernctl_metadata_cache_dump(stream->wait_fd);
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * Reset the position pushed from the metadata cache so it
+ * will write from the beginning on the next push.
+ */
+ stream->ust_metadata_pushed = 0;
+ ret = consumer_metadata_wakeup_pipe(stream->chan);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+ if (ret < 0) {
+ ERR("Failed to dump the metadata cache");
+ }
+ return ret;
+}
+
+static
+int lttng_consumer_channel_set_trace_chunk(
+ struct lttng_consumer_channel *channel,
+ struct lttng_trace_chunk *new_trace_chunk)
+{
+ pthread_mutex_lock(&channel->lock);
+ if (channel->is_deleted) {
+ /*
+ * The channel has been logically deleted and should no longer
+ * be used. It has released its reference to its current trace
+ * chunk and should not acquire a new one.
+ *
+ * Return success as there is nothing for the caller to do.
+ */
+ goto end;
+ }
+
+ /*
+ * The acquisition of the reference cannot fail (barring
+ * a severe internal error) since a reference to the published
+ * chunk is already held by the caller.
+ */
+ if (new_trace_chunk) {
+ const bool acquired_reference = lttng_trace_chunk_get(
+ new_trace_chunk);
+
+ assert(acquired_reference);
+ }
+
+ lttng_trace_chunk_put(channel->trace_chunk);
+ channel->trace_chunk = new_trace_chunk;
+end:
+ pthread_mutex_unlock(&channel->lock);
+ return 0;
+}
+
/*
* Allocate and return a new lttng_consumer_channel object using the given key
* to initialize the hash table node.
*/
struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
uint64_t session_id,
+ const uint64_t *chunk_id,
const char *pathname,
const char *name,
- uid_t uid,
- gid_t gid,
uint64_t relayd_id,
enum lttng_event_output output,
uint64_t tracefile_size,
const char *root_shm_path,
const char *shm_path)
{
- struct lttng_consumer_channel *channel;
+ struct lttng_consumer_channel *channel = NULL;
+ struct lttng_trace_chunk *trace_chunk = NULL;
+
+ if (chunk_id) {
+ trace_chunk = lttng_trace_chunk_registry_find_chunk(
+ consumer_data.chunk_registry, session_id,
+ *chunk_id);
+ if (!trace_chunk) {
+ ERR("Failed to find trace chunk reference during creation of channel");
+ goto end;
+ }
+ }
channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
channel->refcount = 0;
channel->session_id = session_id;
channel->session_id_per_pid = session_id_per_pid;
- channel->uid = uid;
- channel->gid = gid;
channel->relayd_id = relayd_id;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
}
lttng_ht_node_init_u64(&channel->node, channel->key);
+ lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node,
+ channel->session_id);
channel->wait_fd = -1;
-
CDS_INIT_LIST_HEAD(&channel->streams.head);
+ if (trace_chunk) {
+ int ret = lttng_consumer_channel_set_trace_chunk(channel,
+ trace_chunk);
+ if (ret) {
+ goto error;
+ }
+ }
+
DBG("Allocated channel (key %" PRIu64 ")", channel->key);
end:
+ lttng_trace_chunk_put(trace_chunk);
return channel;
+error:
+ consumer_del_channel(channel);
+ channel = NULL;
+ goto end;
}
/*
rcu_read_lock();
lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
+ lttng_ht_add_u64(consumer_data.channels_by_session_id_ht,
+ &channel->channels_by_session_id_ht_node);
rcu_read_unlock();
+ channel->is_published = true;
pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
*/
static int update_poll_array(struct lttng_consumer_local_data *ctx,
struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
- struct lttng_ht *ht)
+ struct lttng_ht *ht, int *nb_inactive_fd)
{
int i = 0;
struct lttng_ht_iter iter;
assert(local_stream);
DBG("Updating poll fd array");
+ *nb_inactive_fd = 0;
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
/*
* just after the check. However, this is OK since the stream(s) will
* be deleted once the thread is notified that the end point state has
* changed where this function will be called back again.
+ *
+ * We track the number of inactive FDs because they still need to be
+ * closed by the polling thread after a wakeup on the data_pipe or
+ * metadata_pipe.
*/
- if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
- stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ (*nb_inactive_fd)++;
continue;
}
/*
* This clobbers way too much the debug output. Uncomment that if you
* need it for debugging purposes.
- *
- * DBG("Active FD %d", stream->wait_fd);
*/
(*pollfd)[i].fd = stream->wait_fd;
(*pollfd)[i].events = POLLIN | POLLPRI;
{
struct lttng_ht_iter iter;
struct lttng_consumer_channel *channel;
+ unsigned int trace_chunks_left;
rcu_read_lock();
rcu_read_unlock();
lttng_ht_destroy(consumer_data.channel_ht);
+ lttng_ht_destroy(consumer_data.channels_by_session_id_ht);
cleanup_relayd_ht();
* it.
*/
lttng_ht_destroy(consumer_data.stream_list_ht);
+
+ /*
+ * Trace chunks in the registry may still exist if the session
+ * daemon has encountered an internal error and could not
+ * tear down its sessions and/or trace chunks properly.
+ *
+ * Release the session daemon's implicit reference to any remaining
+ * trace chunk and print an error if any trace chunk was found. Note
+ * that there are _no_ legitimate cases for trace chunks to be left,
+ * it is a leak. However, it can happen following a crash of the
+ * session daemon and not emptying the registry would cause an assertion
+ * to hit.
+ */
+ trace_chunks_left = lttng_trace_chunk_registry_put_each_chunk(
+ consumer_data.chunk_registry);
+ if (trace_chunks_left) {
+ ERR("%u trace chunks are leaked by lttng-consumerd. "
+ "This can be caused by an internal error of the session daemon.",
+ trace_chunks_left);
+ }
+ /* Run all callbacks freeing each chunk. */
+ rcu_barrier();
+ lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry);
}
/*
{
ssize_t ret;
- consumer_quit = 1;
+ CMM_STORE_SHARED(consumer_quit, 1);
ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
if (ret < 1) {
PERROR("write consumer quit");
DBG("Consumer flag that it should quit");
}
+
+/*
+ * Flush pending writes to trace output disk file.
+ */
+static
void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
off_t orig_offset)
{
goto error_metadata_pipe;
}
+ ctx->channel_monitor_pipe = -1;
+
return ctx;
error_metadata_pipe:
*/
static int write_relayd_metadata_id(int fd,
struct lttng_consumer_stream *stream,
- struct consumer_relayd_sock_pair *relayd, unsigned long padding)
+ unsigned long padding)
{
ssize_t ret;
struct lttcomm_relayd_metadata_payload hdr;
* core function for writing trace buffers to either the local filesystem or
* the network.
*
- * It must be called with the stream lock held.
+ * It must be called with the stream and the channel lock held.
*
* Careful review MUST be put if any changes occur!
*
/* RCU lock for the relayd pointer */
rcu_read_lock();
+ assert(stream->net_seq_idx != (uint64_t) -1ULL ||
+ stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
mmap_base = stream->mmap_base;
ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
if (ret < 0) {
- ret = -errno;
PERROR("tracer ctl get_mmap_read_offset");
goto end;
}
/* Write metadata stream id before payload */
if (stream->metadata_flag) {
- ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
+ ret = write_relayd_metadata_id(outfd, stream, padding);
if (ret < 0) {
relayd_hang_up = 1;
goto write_error;
if (stream->chan->tracefile_size > 0 &&
(stream->tracefile_size_current + len) >
stream->chan->tracefile_size) {
- ret = utils_rotate_stream_file(stream->chan->pathname,
- stream->name, stream->chan->tracefile_size,
- stream->chan->tracefile_count, stream->uid, stream->gid,
- stream->out_fd, &(stream->tracefile_count_current),
- &stream->out_fd);
- if (ret < 0) {
- ERR("Rotating output file");
+ ret = consumer_stream_rotate_output_files(stream);
+ if (ret) {
goto end;
}
outfd = stream->out_fd;
-
- if (stream->index_fd >= 0) {
- ret = close(stream->index_fd);
- if (ret < 0) {
- PERROR("Closing index");
- goto end;
- }
- stream->index_fd = -1;
- ret = index_create_file(stream->chan->pathname,
- stream->name, stream->uid, stream->gid,
- stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- goto end;
- }
- stream->index_fd = ret;
- }
-
- /* Reset current size because we just perform a rotation. */
- stream->tracefile_size_current = 0;
- stream->out_fd_offset = 0;
orig_offset = 0;
}
stream->tracefile_size_current += len;
relayd_hang_up = 1;
/* Socket operation failed. We consider the relayd dead */
- if (errno == EPIPE || errno == EINVAL || errno == EBADF) {
+ if (errno == EPIPE) {
/*
* This is possible if the fd is closed on the other side
* (outfd) or any write problem. It can be verbose a bit for a
lttng_sync_file_range(outfd, stream->out_fd_offset, len,
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += len;
+ lttng_consumer_sync_trace_file(stream, orig_offset);
}
- lttng_consumer_sync_trace_file(stream, orig_offset);
write_error:
/*
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- cleanup_relayd(relayd, ctx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
}
end:
}
stream->reset_metadata_flag = 0;
}
- ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
+ ret = write_relayd_metadata_id(splice_pipe[1], stream,
padding);
if (ret < 0) {
written = ret;
if (stream->chan->tracefile_size > 0 &&
(stream->tracefile_size_current + len) >
stream->chan->tracefile_size) {
- ret = utils_rotate_stream_file(stream->chan->pathname,
- stream->name, stream->chan->tracefile_size,
- stream->chan->tracefile_count, stream->uid, stream->gid,
- stream->out_fd, &(stream->tracefile_count_current),
- &stream->out_fd);
+ ret = consumer_stream_rotate_output_files(stream);
if (ret < 0) {
written = ret;
- ERR("Rotating output file");
goto end;
}
outfd = stream->out_fd;
-
- if (stream->index_fd >= 0) {
- ret = close(stream->index_fd);
- if (ret < 0) {
- PERROR("Closing index");
- goto end;
- }
- stream->index_fd = -1;
- ret = index_create_file(stream->chan->pathname,
- stream->name, stream->uid, stream->gid,
- stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- written = ret;
- goto end;
- }
- stream->index_fd = ret;
- }
-
- /* Reset current size because we just perform a rotation. */
- stream->tracefile_size_current = 0;
- stream->out_fd_offset = 0;
orig_offset = 0;
}
stream->tracefile_size_current += len;
stream->output_written += ret_splice;
written += ret_splice;
}
- lttng_consumer_sync_trace_file(stream, orig_offset);
+ if (!relayd) {
+ lttng_consumer_sync_trace_file(stream, orig_offset);
+ }
goto end;
write_error:
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- cleanup_relayd(relayd, ctx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
/* Skip splice error so the consumer does not fail */
goto end;
}
return written;
}
+/*
+ * Sample the snapshot positions for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_sample_snapshot_positions(stream);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_sample_snapshot_positions(stream);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
/*
* Take a snapshot for a specific fd
*
}
}
+/*
+ * Get the consumed position (free-running counter position in bytes).
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_get_consumed_snapshot(stream, pos);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll)
{
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht)
{
- struct lttng_consumer_channel *free_chan = NULL;
+ struct lttng_consumer_channel *channel = NULL;
+ bool free_channel = false;
assert(stream);
/*
DBG3("Consumer delete metadata stream %d", stream->wait_fd);
pthread_mutex_lock(&consumer_data.lock);
- pthread_mutex_lock(&stream->chan->lock);
+ /*
+ * Note that this assumes that a stream's channel is never changed and
+ * that the stream's lock doesn't need to be taken to sample its
+ * channel.
+ */
+ channel = stream->chan;
+ pthread_mutex_lock(&channel->lock);
pthread_mutex_lock(&stream->lock);
+ if (channel->metadata_cache) {
+ /* Only applicable to userspace consumers. */
+ pthread_mutex_lock(&channel->metadata_cache->lock);
+ }
/* Remove any reference to that stream. */
consumer_stream_delete(stream, ht);
consumer_stream_destroy_buffers(stream);
/* Atomically decrement channel refcount since other threads can use it. */
- if (!uatomic_sub_return(&stream->chan->refcount, 1)
- && !uatomic_read(&stream->chan->nb_init_stream_left)) {
+ if (!uatomic_sub_return(&channel->refcount, 1)
+ && !uatomic_read(&channel->nb_init_stream_left)) {
/* Go for channel deletion! */
- free_chan = stream->chan;
+ free_channel = true;
}
+ stream->chan = NULL;
/*
* Nullify the stream reference so it is not used after deletion. The
* channel lock MUST be acquired before being able to check for a NULL
* pointer value.
*/
- stream->chan->metadata_stream = NULL;
+ channel->metadata_stream = NULL;
+ if (channel->metadata_cache) {
+ pthread_mutex_unlock(&channel->metadata_cache->lock);
+ }
pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
- if (free_chan) {
- consumer_del_channel(free_chan);
+ if (free_channel) {
+ consumer_del_channel(channel);
}
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
consumer_stream_free(stream);
}
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
-int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
+void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
{
struct lttng_ht *ht = metadata_ht;
- int ret = 0;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
lttng_ht_add_unique_u64(ht, &stream->node);
- lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
+ lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
&stream->node_channel_id);
/*
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&consumer_data.lock);
- return ret;
}
/*
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
if (revents & LPOLLIN) {
ssize_t pipe_len;
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
- * a negative len, it means an error occured thus we
+ * a negative len, it means an error occurred thus we
* simply remove it from the poll set and free the
* stream.
*/
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
- * a negative len, it means an error occured thus we
+ * a negative len, it means an error occurred thus we
* simply remove it from the poll set and free the
* stream.
*/
struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
/* local view of consumer_data.fds_count */
int nb_fd = 0;
+ /* 2 for the consumer_data_pipe and wake up pipe */
+ const int nb_pipes_fd = 2;
+ /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
+ int nb_inactive_fd = 0;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
free(local_stream);
local_stream = NULL;
- /*
- * Allocate for all fds +1 for the consumer_data_pipe and +1 for
- * wake up pipe.
- */
- pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
+ /* Allocate for all fds */
+ pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- local_stream = zmalloc((consumer_data.stream_count + 2) *
+ local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
goto end;
}
ret = update_poll_array(ctx, &pollfd, local_stream,
- data_ht);
+ data_ht, &nb_inactive_fd);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
pthread_mutex_unlock(&consumer_data.lock);
/* No FDs and consumer_quit, consumer_cleanup the thread */
- if (nb_fd == 0 && consumer_quit == 1) {
+ if (nb_fd == 0 && nb_inactive_fd == 0 &&
+ CMM_LOAD_SHARED(consumer_quit) == 1) {
err = 0; /* All is OK */
goto end;
}
/* poll on the array of fds */
restart:
- DBG("polling on %d fd", nb_fd + 2);
+ DBG("polling on %d fd", nb_fd + nb_pipes_fd);
+ if (testpoint(consumerd_thread_data_poll)) {
+ goto end;
+ }
health_poll_entry();
- num_rdy = poll(pollfd, nb_fd + 2, -1);
+ num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
goto end;
}
+ if (caa_unlikely(data_consumption_paused)) {
+ DBG("Data consumption paused, sleeping...");
+ sleep(1);
+ goto restart;
+ }
+
/*
* If the consumer_data_pipe triggered poll go directly to the
* beginning of the loop to update the array. We want to prioritize
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
if (pollfd == ctx->consumer_channel_pipe[0]) {
if (revents & LPOLLIN) {
enum consumer_channel_action action;
err = 0;
goto end;
}
- if (consumer_quit) {
+ if (CMM_LOAD_SHARED(consumer_quit)) {
DBG("consumer_thread_receive_fds received quit from signal");
err = 0; /* All is OK */
goto end;
* when all fds have hung up, the polling thread
* can exit cleanly
*/
- consumer_quit = 1;
+ CMM_STORE_SHARED(consumer_quit, 1);
/*
* Notify the data poll thread to poll back again and test the
{
ssize_t ret;
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
if (stream->metadata_flag) {
pthread_mutex_lock(&stream->metadata_rdv_lock);
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+
return ret;
}
goto error;
}
+ consumer_data.channels_by_session_id_ht =
+ lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.channels_by_session_id_ht) {
+ goto error;
+ }
+
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!consumer_data.relayd_ht) {
goto error;
goto error;
}
+ consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
+ if (!consumer_data.chunk_registry) {
+ goto error;
+ }
+
return 0;
error:
* This will create a relayd socket pair and add it to the relayd hash table.
* The caller MUST acquire a RCU read side lock before calling it.
*/
-int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
+ void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
- ret = -ENOMEM;
ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
} else {
if (ret) {
/* Needing to exit in the middle of a command: error. */
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
- ret = -EINTR;
goto error_nosignal;
}
/* Get relayd socket from session daemon */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
- ret = -1;
fd = -1; /* Just in case it gets set with an invalid value. */
/*
/* Assign new file descriptor */
relayd->control_sock.sock.fd = fd;
- fd = -1; /* For error path */
/* Assign version values. */
relayd->control_sock.major = relayd_sock->major;
relayd->control_sock.minor = relayd_sock->minor;
/* Assign new file descriptor */
relayd->data_sock.sock.fd = fd;
- fd = -1; /* for eventual error paths */
/* Assign version values. */
relayd->data_sock.major = relayd_sock->major;
relayd->data_sock.minor = relayd_sock->minor;
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
- ret = -1;
ret_code = LTTCOMM_CONSUMERD_FATAL;
goto error;
}
DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
+ /*
+ * We gave the ownership of the fd to the relayd structure. Set the
+ * fd to -1 so we don't call close() on it in the error path below.
+ */
+ fd = -1;
/* We successfully added the socket. Send status back. */
ret = consumer_send_status_msg(sock, ret_code);
* Add relayd socket pair to consumer data hashtable. If object already
* exists or on error, the function gracefully returns.
*/
+ relayd->ctx = ctx;
add_relayd(relayd);
/* All good! */
- return 0;
+ return;
error:
if (consumer_send_status_msg(sock, ret_code) < 0) {
if (relayd_created) {
free(relayd);
}
-
- return ret;
-}
-
-/*
- * Try to lock the stream mutex.
- *
- * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
- */
-static int stream_try_lock(struct lttng_consumer_stream *stream)
-{
- int ret;
-
- assert(stream);
-
- /*
- * Try to lock the stream mutex. On failure, we know that the stream is
- * being used else where hence there is data still being extracted.
- */
- ret = pthread_mutex_trylock(&stream->lock);
- if (ret) {
- /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
- ret = 0;
- goto end;
- }
-
- ret = 1;
-
-end:
- return ret;
}
/*
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
- relayd = find_relayd_by_session_id(id);
- if (relayd) {
- /* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_begin_data_pending(&relayd->control_sock,
- relayd->relayd_session_id);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- /* Communication error thus the relayd so no data pending. */
- goto data_not_pending;
- }
- }
-
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&id, lttng_ht_seed),
ht->match_fct, &id,
&iter.iter, stream, node_session_id.node) {
- /* If this call fails, the stream is being used hence data pending. */
- ret = stream_try_lock(stream);
- if (!ret) {
- goto data_pending;
- }
+ pthread_mutex_lock(&stream->lock);
/*
* A removed node from the hash table indicates that the stream has
}
}
- /* Relayd check */
- if (relayd) {
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ if (ret < 0) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ /* Communication error thus the relayd so no data pending. */
+ goto data_not_pending;
+ }
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock,
stream->relayd_stream_id);
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+
if (ret == 1) {
- pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_pending;
+ } else if (ret < 0) {
+ ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ goto data_not_pending;
}
}
- pthread_mutex_unlock(&stream->lock);
- }
- if (relayd) {
- unsigned int is_data_inflight = 0;
-
- /* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ /* Send end command for data pending. */
ret = relayd_end_data_pending(&relayd->control_sock,
relayd->relayd_session_id, &is_data_inflight);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
+ ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
goto data_not_pending;
}
if (is_data_inflight) {
}
return start_pos;
}
+
+static
+int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
+{
+ int ret = 0;
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ lttng_ustctl_flush_buffer(stream, producer_active);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Sample the rotate position for all the streams of a channel. If a stream
+ * is already at the rotate position (produced == consumed), we flag it as
+ * ready for rotation. The rotation of ready streams occurs after we have
+ * replied to the session daemon that we have finished sampling the positions.
+ * Must be called with RCU read-side lock held to ensure existence of channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+ uint64_t key, uint64_t relayd_id, uint32_t metadata,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ struct lttng_dynamic_array stream_rotation_positions;
+ uint64_t next_chunk_id, stream_count = 0;
+ enum lttng_trace_chunk_status chunk_status;
+ const bool is_local_trace = relayd_id == -1ULL;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool rotating_to_new_chunk = true;
+
+ DBG("Consumer sample rotate position for channel %" PRIu64, key);
+
+ lttng_dynamic_array_init(&stream_rotation_positions,
+ sizeof(struct relayd_stream_rotation_position), NULL);
+
+ rcu_read_lock();
+
+ pthread_mutex_lock(&channel->lock);
+ assert(channel->trace_chunk);
+ chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
+ &next_chunk_id);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end_unlock_channel;
+ }
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ unsigned long produced_pos = 0, consumed_pos = 0;
+
+ health_code_update();
+
+ /*
+ * Lock stream because we are about to change its state.
+ */
+ pthread_mutex_lock(&stream->lock);
+
+ if (stream->trace_chunk == stream->chan->trace_chunk) {
+ rotating_to_new_chunk = false;
+ }
+
+ /*
+ * Active flush; has no effect if the production position
+ * is at a packet boundary.
+ */
+ ret = consumer_flush_buffer(stream, 1);
+ if (ret < 0) {
+ ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+ stream->key);
+ goto end_unlock_stream;
+ }
+
+ ret = lttng_consumer_take_snapshot(stream);
+ if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
+ ERR("Failed to sample snapshot position during channel rotation");
+ goto end_unlock_stream;
+ }
+ if (!ret) {
+ ret = lttng_consumer_get_produced_snapshot(stream,
+ &produced_pos);
+ if (ret < 0) {
+ ERR("Failed to sample produced position during channel rotation");
+ goto end_unlock_stream;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream,
+ &consumed_pos);
+ if (ret < 0) {
+ ERR("Failed to sample consumed position during channel rotation");
+ goto end_unlock_stream;
+ }
+ }
+ /*
+ * Align produced position on the start-of-packet boundary of the first
+ * packet going into the next trace chunk.
+ */
+ produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
+ if (consumed_pos == produced_pos) {
+ stream->rotate_ready = true;
+ }
+ /*
+ * The rotation position is based on the packet_seq_num of the
+ * packet following the last packet that was consumed for this
+ * stream, incremented by the offset between produced and
+ * consumed positions. This rotation position is a lower bound
+ * (inclusive) at which the next trace chunk starts. Since it
+ * is a lower bound, it is OK if the packet_seq_num does not
+ * correspond exactly to the same packet identified by the
+ * consumed_pos, which can happen in overwrite mode.
+ */
+ if (stream->sequence_number_unavailable) {
+ /*
+ * Rotation should never be performed on a session which
+ * interacts with a pre-2.8 lttng-modules, which does
+ * not implement packet sequence number.
+ */
+ ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
+ stream->key);
+ ret = -1;
+ goto end_unlock_stream;
+ }
+ stream->rotate_position = stream->last_sequence_number + 1 +
+ ((produced_pos - consumed_pos) / stream->max_sb_size);
+
+ if (!is_local_trace) {
+ /*
+ * The relay daemon control protocol expects a rotation
+ * position as "the sequence number of the first packet
+ * _after_ the current trace chunk".
+ */
+ const struct relayd_stream_rotation_position position = {
+ .stream_id = stream->relayd_stream_id,
+ .rotate_at_seq_num = stream->rotate_position,
+ };
+
+ ret = lttng_dynamic_array_add_element(
+ &stream_rotation_positions,
+ &position);
+ if (ret) {
+ ERR("Failed to allocate stream rotation position");
+ goto end_unlock_stream;
+ }
+ stream_count++;
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+ stream = NULL;
+ pthread_mutex_unlock(&channel->lock);
+
+ if (is_local_trace) {
+ ret = 0;
+ goto end;
+ }
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, relayd_id);
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer.data);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+ relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end;
+ }
+
+ ret = 0;
+ goto end;
+
+end_unlock_stream:
+ pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
+ pthread_mutex_unlock(&channel->lock);
+end:
+ rcu_read_unlock();
+ lttng_dynamic_array_reset(&stream_rotation_positions);
+ return ret;
+}
+
+/*
+ * Check if a stream is ready to be rotated after extracting it.
+ *
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error. Stream lock must be held.
+ */
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
+{
+ if (stream->rotate_ready) {
+ return 1;
+ }
+
+ /*
+ * If packet seq num is unavailable, it means we are interacting
+ * with a pre-2.8 lttng-modules which does not implement the
+ * sequence number. Rotation should never be used by sessiond in this
+ * scenario.
+ */
+ if (stream->sequence_number_unavailable) {
+ ERR("Internal error: rotation used on stream %" PRIu64
+ " with unavailable sequence number",
+ stream->key);
+ return -1;
+ }
+
+ if (stream->rotate_position == -1ULL ||
+ stream->last_sequence_number == -1ULL) {
+ return 0;
+ }
+
+ /*
+ * Rotate position not reached yet. The stream rotate position is
+ * the position of the next packet belonging to the next trace chunk,
+ * but consumerd considers rotation ready when reaching the last
+ * packet of the current chunk, hence the "rotate_position - 1".
+ */
+ if (stream->last_sequence_number >= stream->rotate_position - 1) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/*
+ * Reset the state for a stream after a rotation occurred.
+ */
+void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
+{
+ stream->rotate_position = -1ULL;
+ stream->rotate_ready = false;
+}
+
+/*
+ * Perform the rotation a local stream file.
+ */
+static
+int rotate_local_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+
+ DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
+ stream->key,
+ stream->chan->key);
+ stream->tracefile_size_current = 0;
+ stream->tracefile_count_current = 0;
+
+ if (stream->out_fd >= 0) {
+ ret = close(stream->out_fd);
+ if (ret) {
+ PERROR("Failed to close stream out_fd of channel \"%s\"",
+ stream->chan->name);
+ }
+ stream->out_fd = -1;
+ }
+
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+
+ if (!stream->trace_chunk) {
+ goto end;
+ }
+
+ ret = consumer_stream_create_output_files(stream, true);
+end:
+ return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the channel and stream locks held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ DBG("Consumer rotate stream %" PRIu64, stream->key);
+
+ /*
+ * Update the stream's 'current' chunk to the session's (channel)
+ * now-current chunk.
+ */
+ lttng_trace_chunk_put(stream->trace_chunk);
+ if (stream->chan->trace_chunk == stream->trace_chunk) {
+ /*
+ * A channel can be rotated and not have a "next" chunk
+ * to transition to. In that case, the channel's "current chunk"
+ * has not been closed yet, but it has not been updated to
+ * a "next" trace chunk either. Hence, the stream, like its
+ * parent channel, becomes part of no chunk and can't output
+ * anything until a new trace chunk is created.
+ */
+ stream->trace_chunk = NULL;
+ } else if (stream->chan->trace_chunk &&
+ !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
+ ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
+ ret = -1;
+ goto error;
+ } else {
+ /*
+ * Update the stream's trace chunk to its parent channel's
+ * current trace chunk.
+ */
+ stream->trace_chunk = stream->chan->trace_chunk;
+ }
+
+ if (stream->net_seq_idx == (uint64_t) -1ULL) {
+ ret = rotate_local_stream(ctx, stream);
+ if (ret < 0) {
+ ERR("Failed to rotate stream, ret = %i", ret);
+ goto error;
+ }
+ }
+
+ if (stream->metadata_flag && stream->trace_chunk) {
+ /*
+ * If the stream has transitioned to a new trace
+ * chunk, the metadata should be re-dumped to the
+ * newest chunk.
+ *
+ * However, it is possible for a stream to transition to
+ * a "no-chunk" state. This can happen if a rotation
+ * occurs on an inactive session. In such cases, the metadata
+ * regeneration will happen when the next trace chunk is
+ * created.
+ */
+ ret = consumer_metadata_stream_dump(stream);
+ if (ret) {
+ goto error;
+ }
+ }
+ lttng_consumer_reset_stream_rotate_state(stream);
+
+ ret = 0;
+
+error:
+ return ret;
+}
+
+/*
+ * Rotate all the ready streams now.
+ *
+ * This is especially important for low throughput streams that have already
+ * been consumed, we cannot wait for their next packet to perform the
+ * rotation.
+ * Need to be called with RCU read-side lock held to ensure existence of
+ * channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+ uint64_t key, struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+ rcu_read_lock();
+
+ DBG("Consumer rotate ready streams in channel %" PRIu64, key);
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ health_code_update();
+
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->lock);
+
+ if (!stream->rotate_ready) {
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ continue;
+ }
+ DBG("Consumer rotate ready stream %" PRIu64, stream->key);
+
+ ret = lttng_consumer_rotate_stream(ctx, stream);
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ if (ret) {
+ goto end;
+ }
+ }
+
+ ret = 0;
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+enum lttcomm_return_code lttng_consumer_init_command(
+ struct lttng_consumer_local_data *ctx,
+ const lttng_uuid sessiond_uuid)
+{
+ enum lttcomm_return_code ret;
+ char uuid_str[UUID_STR_LEN];
+
+ if (ctx->sessiond_uuid.is_set) {
+ ret = LTTCOMM_CONSUMERD_ALREADY_SET;
+ goto end;
+ }
+
+ ctx->sessiond_uuid.is_set = true;
+ memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
+ ret = LTTCOMM_CONSUMERD_SUCCESS;
+ lttng_uuid_to_str(sessiond_uuid, uuid_str);
+ DBG("Received session daemon UUID: %s", uuid_str);
+end:
+ return ret;
+}
+
+enum lttcomm_return_code lttng_consumer_create_trace_chunk(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_creation_timestamp,
+ const char *chunk_override_name,
+ const struct lttng_credentials *credentials,
+ struct lttng_directory_handle *chunk_directory_handle)
+{
+ int ret;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttng_trace_chunk *created_chunk = NULL, *published_chunk = NULL;
+ enum lttng_trace_chunk_status chunk_status;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ char creation_timestamp_buffer[ISO8601_STR_LEN];
+ const char *relayd_id_str = "(none)";
+ const char *creation_timestamp_str;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+
+ if (relayd_id) {
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+
+ /* Local protocol error. */
+ assert(chunk_creation_timestamp);
+ ret = time_to_iso8601_str(chunk_creation_timestamp,
+ creation_timestamp_buffer,
+ sizeof(creation_timestamp_buffer));
+ creation_timestamp_str = !ret ? creation_timestamp_buffer :
+ "(formatting error)";
+
+ DBG("Consumer create trace chunk command: relay_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", chunk_override_name = %s"
+ ", chunk_creation_timestamp = %s",
+ relayd_id_str, session_id, chunk_id,
+ chunk_override_name ? : "(none)",
+ creation_timestamp_str);
+
+ /*
+ * The trace chunk registry, as used by the consumer daemon, implicitly
+ * owns the trace chunks. This is only needed in the consumer since
+ * the consumer has no notion of a session beyond session IDs being
+ * used to identify other objects.
+ *
+ * The lttng_trace_chunk_registry_publish() call below provides a
+ * reference which is not released; it implicitly becomes the session
+ * daemon's reference to the chunk in the consumer daemon.
+ *
+ * The lifetime of trace chunks in the consumer daemon is managed by
+ * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
+ * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
+ */
+ created_chunk = lttng_trace_chunk_create(chunk_id,
+ chunk_creation_timestamp);
+ if (!created_chunk) {
+ ERR("Failed to create trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+
+ if (chunk_override_name) {
+ chunk_status = lttng_trace_chunk_override_name(created_chunk,
+ chunk_override_name);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ }
+
+ if (chunk_directory_handle) {
+ chunk_status = lttng_trace_chunk_set_credentials(created_chunk,
+ credentials);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk credentials");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ /*
+ * The consumer daemon has no ownership of the chunk output
+ * directory.
+ */
+ chunk_status = lttng_trace_chunk_set_as_user(created_chunk,
+ chunk_directory_handle);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk's directory handle");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ }
+
+ published_chunk = lttng_trace_chunk_registry_publish_chunk(
+ consumer_data.chunk_registry, session_id,
+ created_chunk);
+ lttng_trace_chunk_put(created_chunk);
+ created_chunk = NULL;
+ if (!published_chunk) {
+ ERR("Failed to publish trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(consumer_data.channels_by_session_id_ht->ht,
+ consumer_data.channels_by_session_id_ht->hash_fct(
+ &session_id, lttng_ht_seed),
+ consumer_data.channels_by_session_id_ht->match_fct,
+ &session_id, &iter.iter, channel,
+ channels_by_session_id_ht_node.node) {
+ ret = lttng_consumer_channel_set_trace_chunk(channel,
+ published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id, chunk_id,
+ chunk_creation_timestamp, NULL,
+ path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
+ }
+ }
+
+ if (relayd_id) {
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_create_trace_chunk(
+ &relayd->control_sock, published_chunk);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ NULL, path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error_unlock;
+ }
+ }
+error_unlock:
+ rcu_read_unlock();
+error:
+ /* Release the reference returned by the "publish" operation. */
+ lttng_trace_chunk_put(published_chunk);
+ lttng_trace_chunk_put(created_chunk);
+ return ret_code;
+}
+
+enum lttcomm_return_code lttng_consumer_close_trace_chunk(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id, time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path)
+{
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttng_trace_chunk *chunk;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ const char *relayd_id_str = "(none)";
+ const char *close_command_name = "none";
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+ enum lttng_trace_chunk_status chunk_status;
+
+ if (relayd_id) {
+ int ret;
+
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+ if (close_command) {
+ close_command_name = lttng_trace_chunk_command_type_get_name(
+ *close_command);
+ }
+
+ DBG("Consumer close trace chunk command: relayd_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", close command = %s",
+ relayd_id_str, session_id, chunk_id,
+ close_command_name);
+
+ chunk = lttng_trace_chunk_registry_find_chunk(
+ consumer_data.chunk_registry, session_id, chunk_id);
+ if (!chunk) {
+ ERR("Failed to find chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ goto end;
+ }
+
+ chunk_status = lttng_trace_chunk_set_close_timestamp(chunk,
+ chunk_close_timestamp);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+
+ if (close_command) {
+ chunk_status = lttng_trace_chunk_set_close_command(
+ chunk, *close_command);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ }
+
+ /*
+ * chunk is now invalid to access as we no longer hold a reference to
+ * it; it is only kept around to compare it (by address) to the
+ * current chunk found in the session's channels.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter,
+ channel, node.node) {
+ int ret;
+
+ /*
+ * Only change the channel's chunk to NULL if it still
+ * references the chunk being closed. The channel may
+ * reference a newer channel in the case of a session
+ * rotation. When a session rotation occurs, the "next"
+ * chunk is created before the "current" chunk is closed.
+ */
+ if (channel->trace_chunk != chunk) {
+ continue;
+ }
+ ret = lttng_consumer_channel_set_trace_chunk(channel, NULL);
+ if (ret) {
+ /*
+ * Attempt to close the chunk on as many channels as
+ * possible.
+ */
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ }
+ }
+
+ if (relayd_id) {
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_close_trace_chunk(
+ &relayd->control_sock, chunk,
+ path);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
+ *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto error_unlock;
+ }
+ }
+error_unlock:
+ rcu_read_unlock();
+end:
+ /*
+ * Release the reference returned by the "find" operation and
+ * the session daemon's implicit reference to the chunk.
+ */
+ lttng_trace_chunk_put(chunk);
+ lttng_trace_chunk_put(chunk);
+
+ return ret_code;
+}
+
+enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id)
+{
+ int ret;
+ enum lttcomm_return_code ret_code;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ const char *relayd_id_str = "(none)";
+ const bool is_local_trace = !relayd_id;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool chunk_exists_local, chunk_exists_remote;
+
+ if (relayd_id) {
+ int ret;
+
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+
+ DBG("Consumer trace chunk exists command: relayd_id = %s"
+ ", chunk_id = %" PRIu64, relayd_id_str,
+ chunk_id);
+ ret = lttng_trace_chunk_registry_chunk_exists(
+ consumer_data.chunk_registry, session_id,
+ chunk_id, &chunk_exists_local);
+ if (ret) {
+ /* Internal error. */
+ ERR("Failed to query the existence of a trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_FATAL;
+ goto end;
+ }
+ DBG("Trace chunk %s locally",
+ chunk_exists_local ? "exists" : "does not exist");
+ if (chunk_exists_local) {
+ ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
+ goto end;
+ } else if (is_local_trace) {
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ goto end;
+ }
+
+ rcu_read_lock();
+ relayd = consumer_find_relayd(*relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, *relayd_id);
+ ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+ goto end_rcu_unlock;
+ }
+ DBG("Looking up existence of trace chunk on relay daemon");
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
+ &chunk_exists_remote);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Failed to look-up the existence of trace chunk on relay daemon");
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ goto end_rcu_unlock;
+ }
+
+ ret_code = chunk_exists_remote ?
+ LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
+ LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ DBG("Trace chunk %s on relay daemon",
+ chunk_exists_remote ? "exists" : "does not exist");
+
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
+ return ret_code;
+}