*/
#define _LGPL_SOURCE
-#include <inttypes.h>
-#include <poll.h>
-#include <pthread.h>
-#include <signal.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/mman.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-#include <bin/lttng-consumerd/health-consumerd.hpp>
#include <common/align.hpp>
#include <common/common.hpp>
#include <common/compat/endian.hpp>
#include <common/ust-consumer/ust-consumer.hpp>
#include <common/utils.hpp>
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <inttypes.h>
+#include <poll.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
lttng_consumer_global_data the_consumer_data;
enum consumer_channel_action {
namespace {
struct consumer_channel_msg {
enum consumer_channel_action action;
- struct lttng_consumer_channel *chan; /* add */
- uint64_t key; /* del */
+ struct lttng_consumer_channel *chan; /* add */
+ uint64_t key; /* del */
};
/*
}
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *chan,
- uint64_t key,
- enum consumer_channel_action action)
+ struct lttng_consumer_channel *chan,
+ uint64_t key,
+ enum consumer_channel_action action)
{
struct consumer_channel_msg msg;
ssize_t ret;
}
}
-void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
- uint64_t key)
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
{
notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
}
static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel **chan,
- uint64_t *key,
- enum consumer_channel_action *action)
+ struct lttng_consumer_channel **chan,
+ uint64_t *key,
+ enum consumer_channel_action *action)
{
struct consumer_channel_msg msg;
ssize_t ret;
LTTNG_ASSERT(channel);
/* Delete streams that might have been left in the stream list. */
- cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
- send_node) {
+ cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
/*
* Once a stream is added to this list, the buffers were created so we
* have a guarantee that this call will succeed. Setting the monitor
* Find a stream. The consumer_data.lock must be locked during this
* call.
*/
-static struct lttng_consumer_stream *find_stream(uint64_t key,
- struct lttng_ht *ht)
+static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht)
{
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
static void free_channel_rcu(struct rcu_head *head)
{
- struct lttng_ht_node_u64 *node =
- lttng::utils::container_of(head, <tng_ht_node_u64::head);
+ struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, <tng_ht_node_u64::head);
struct lttng_consumer_channel *channel =
lttng::utils::container_of(node, <tng_consumer_channel::node);
*/
static void free_relayd_rcu(struct rcu_head *head)
{
- struct lttng_ht_node_u64 *node =
- lttng::utils::container_of(head, <tng_ht_node_u64::head);
+ struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, <tng_ht_node_u64::head);
struct consumer_relayd_sock_pair *relayd =
lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
LTTNG_ASSERT(!ret);
iter.iter.node = &channel->channels_by_session_id_ht_node.node;
- ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht,
- &iter);
+ ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
LTTNG_ASSERT(!ret);
rcu_read_unlock();
}
rcu_read_lock();
- cds_lfht_for_each_entry(the_consumer_data.relayd_ht->ht, &iter.iter,
- relayd, node.node) {
+ cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
consumer_destroy_relayd(relayd);
}
* because we handle the write/read race with a pipe wakeup for each thread.
*/
static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
- enum consumer_endpoint_status status)
+ enum consumer_endpoint_status status)
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
rcu_read_lock();
/* Let's begin with metadata */
- cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
DBG("Delete flag set to metadata stream %d", stream->wait_fd);
}
/* Follow up by the data streams */
- cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+ cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
DBG("Delete flag set to data stream %d", stream->wait_fd);
*
* One this call returns, the stream object is not longer usable nor visible.
*/
-void consumer_del_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
{
consumer_stream_destroy(stream, ht);
}
consumer_stream_destroy(stream, metadata_ht);
}
-void consumer_stream_update_channel_attributes(
- struct lttng_consumer_stream *stream,
- struct lttng_consumer_channel *channel)
+void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_channel *channel)
{
- stream->channel_read_only_attributes.tracefile_size =
- channel->tracefile_size;
+ stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size;
}
/*
lttng_ht_add_unique_u64(ht, &stream->node);
- lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht,
- &stream->node_channel_id);
+ lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
/*
* Add stream to the stream_list_ht of the consumer data. No need to steal
* the key since the HT does not use it and we allow to add redundant keys
* into this table.
*/
- lttng_ht_add_u64(the_consumer_data.stream_list_ht,
- &stream->node_session_id);
+ lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
LTTNG_ASSERT(relayd);
ASSERT_RCU_READ_LOCKED();
- lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx,
- &iter);
+ lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
goto end;
/*
* Allocate and return a consumer relayd socket.
*/
-static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
- uint64_t net_seq_idx)
+static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
{
struct consumer_relayd_sock_pair *obj = NULL;
*
* Returns 0 on success, < 0 on error
*/
-int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
- char *path)
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
{
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
if (relayd != NULL) {
/* Add stream on the relayd */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_add_stream(&relayd->control_sock, stream->name,
- get_consumer_domain(), path, &stream->relayd_stream_id,
- stream->chan->tracefile_size,
- stream->chan->tracefile_count,
- stream->trace_chunk);
+ ret = relayd_add_stream(&relayd->control_sock,
+ stream->name,
+ get_consumer_domain(),
+ path,
+ &stream->relayd_stream_id,
+ stream->chan->tracefile_size,
+ stream->chan->tracefile_count,
+ stream->trace_chunk);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64 ".",
+ relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
goto end;
}
stream->sent_to_relayd = 1;
} else {
ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
- stream->key, stream->net_seq_idx);
+ stream->key,
+ stream->net_seq_idx);
ret = -1;
goto end;
}
DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
- stream->name, stream->key, stream->net_seq_idx);
+ stream->name,
+ stream->key,
+ stream->net_seq_idx);
end:
rcu_read_unlock();
ret = relayd_streams_sent(&relayd->control_sock);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64 ".",
+ relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
goto end;
}
} else {
- ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
- net_seq_idx);
+ ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx);
ret = -1;
goto end;
}
* Return destination file descriptor or negative value on error.
*/
static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
- size_t data_size, unsigned long padding,
- struct consumer_relayd_sock_pair *relayd)
+ size_t data_size,
+ unsigned long padding,
+ struct consumer_relayd_sock_pair *relayd)
{
int outfd = -1, ret;
struct lttcomm_relayd_data_hdr data_hdr;
data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
/* Other fields are zeroed previously */
- ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
- sizeof(data_hdr));
+ ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, sizeof(data_hdr));
if (ret < 0) {
goto error;
}
{
int ret = 0;
- DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'",
- channel->name);
+ DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", channel->name);
if (channel->monitor && channel->metadata_stream) {
const char dummy = 'c';
- const ssize_t write_ret = lttng_write(
- channel->metadata_stream->ust_metadata_poll_pipe[1],
- &dummy, 1);
+ const ssize_t write_ret =
+ lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1);
if (write_ret < 1) {
if (errno == EWOULDBLOCK) {
*
* The caller must hold the channel and stream locks.
*/
-static
-int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
+static int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
{
int ret;
return ret;
}
-static
-int lttng_consumer_channel_set_trace_chunk(
- struct lttng_consumer_channel *channel,
- struct lttng_trace_chunk *new_trace_chunk)
+static int lttng_consumer_channel_set_trace_chunk(struct lttng_consumer_channel *channel,
+ struct lttng_trace_chunk *new_trace_chunk)
{
pthread_mutex_lock(&channel->lock);
if (channel->is_deleted) {
* chunk is already held by the caller.
*/
if (new_trace_chunk) {
- const bool acquired_reference = lttng_trace_chunk_get(
- new_trace_chunk);
+ const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
LTTNG_ASSERT(acquired_reference);
}
* On error, return NULL.
*/
struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
- uint64_t session_id,
- const uint64_t *chunk_id,
- const char *pathname,
- const char *name,
- uint64_t relayd_id,
- enum lttng_event_output output,
- uint64_t tracefile_size,
- uint64_t tracefile_count,
- uint64_t session_id_per_pid,
- unsigned int monitor,
- unsigned int live_timer_interval,
- bool is_in_live_session,
- const char *root_shm_path,
- const char *shm_path)
+ uint64_t session_id,
+ const uint64_t *chunk_id,
+ const char *pathname,
+ const char *name,
+ uint64_t relayd_id,
+ enum lttng_event_output output,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid,
+ unsigned int monitor,
+ unsigned int live_timer_interval,
+ bool is_in_live_session,
+ const char *root_shm_path,
+ const char *shm_path)
{
struct lttng_consumer_channel *channel = NULL;
struct lttng_trace_chunk *trace_chunk = NULL;
if (chunk_id) {
trace_chunk = lttng_trace_chunk_registry_find_chunk(
- the_consumer_data.chunk_registry, session_id,
- *chunk_id);
+ the_consumer_data.chunk_registry, session_id, *chunk_id);
if (!trace_chunk) {
ERR("Failed to find trace chunk reference during creation of channel");
goto end;
}
lttng_ht_node_init_u64(&channel->node, channel->key);
- lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node,
- channel->session_id);
+ lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, channel->session_id);
channel->wait_fd = -1;
CDS_INIT_LIST_HEAD(&channel->streams.head);
if (trace_chunk) {
- int ret = lttng_consumer_channel_set_trace_chunk(channel,
- trace_chunk);
+ int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
if (ret) {
goto error;
}
* Always return 0 indicating success.
*/
int consumer_add_channel(struct lttng_consumer_channel *channel,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx)
{
pthread_mutex_lock(&the_consumer_data.lock);
pthread_mutex_lock(&channel->lock);
rcu_read_lock();
lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
- &channel->channels_by_session_id_ht_node);
+ &channel->channels_by_session_id_ht_node);
rcu_read_unlock();
channel->is_published = true;
* Returns the number of fds in the structures.
*/
static int update_poll_array(struct lttng_consumer_local_data *ctx,
- struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
- struct lttng_ht *ht, int *nb_inactive_fd)
+ struct pollfd **pollfd,
+ struct lttng_consumer_stream **local_stream,
+ struct lttng_ht *ht,
+ int *nb_inactive_fd)
{
int i = 0;
struct lttng_ht_iter iter;
DBG("Updating poll fd array");
*nb_inactive_fd = 0;
rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
/*
* Only active streams with an active end point can be added to the
* poll set and local stream storage of the thread.
/*
* Set the error socket.
*/
-void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
- int sock)
+void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock)
{
ctx->consumer_error_socket = sock;
}
/*
* Set the command socket path.
*/
-void lttng_consumer_set_command_sock_path(
- struct lttng_consumer_local_data *ctx, char *sock)
+void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock)
{
ctx->consumer_command_sock_path = sock;
}
int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
{
if (ctx->consumer_error_socket > 0) {
- return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
- sizeof(enum lttcomm_sessiond_command));
+ return lttcomm_send_unix_sock(
+ ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
}
return 0;
rcu_read_lock();
- cds_lfht_for_each_entry(the_consumer_data.channel_ht->ht, &iter.iter,
- channel, node.node) {
+ cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
consumer_del_channel(channel);
}
* session daemon and not emptying the registry would cause an assertion
* to hit.
*/
- trace_chunks_left = lttng_trace_chunk_registry_put_each_chunk(
- the_consumer_data.chunk_registry);
+ trace_chunks_left =
+ lttng_trace_chunk_registry_put_each_chunk(the_consumer_data.chunk_registry);
if (trace_chunks_left) {
ERR("%u trace chunks are leaked by lttng-consumerd. "
- "This can be caused by an internal error of the session daemon.",
- trace_chunks_left);
+ "This can be caused by an internal error of the session daemon.",
+ trace_chunks_left);
}
/* Run all callbacks freeing each chunk. */
rcu_barrier();
DBG("Consumer flag that it should quit");
}
-
/*
* Flush pending writes to trace output disk file.
*/
-static
-void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
- off_t orig_offset)
+static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
{
int ret;
int outfd = stream->out_fd;
if (orig_offset < stream->max_sb_size) {
return;
}
- lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
- stream->max_sb_size,
- SYNC_FILE_RANGE_WAIT_BEFORE
- | SYNC_FILE_RANGE_WRITE
- | SYNC_FILE_RANGE_WAIT_AFTER);
+ lttng_sync_file_range(outfd,
+ orig_offset - stream->max_sb_size,
+ stream->max_sb_size,
+ SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
+ SYNC_FILE_RANGE_WAIT_AFTER);
/*
* Give hints to the kernel about how we access the file:
* POSIX_FADV_DONTNEED : we won't re-access data in a near future after
* defined. So it can be expected to lead to lower throughput in
* streaming.
*/
- ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size,
- stream->max_sb_size, POSIX_FADV_DONTNEED);
+ ret = posix_fadvise(
+ outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
if (ret && ret != -ENOSYS) {
errno = ret;
PERROR("posix_fadvise on fd %i", outfd);
*
* Returns a pointer to the new context or NULL on error.
*/
-struct lttng_consumer_local_data *lttng_consumer_create(
- enum lttng_consumer_type type,
- ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx, bool locked_by_caller),
- int (*recv_channel)(struct lttng_consumer_channel *channel),
- int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(uint64_t stream_key, uint32_t state))
+struct lttng_consumer_local_data *
+lttng_consumer_create(enum lttng_consumer_type type,
+ ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller),
+ int (*recv_channel)(struct lttng_consumer_channel *channel),
+ int (*recv_stream)(struct lttng_consumer_stream *stream),
+ int (*update_stream)(uint64_t stream_key, uint32_t state))
{
int ret;
struct lttng_consumer_local_data *ctx;
LTTNG_ASSERT(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
- the_consumer_data.type == type);
+ the_consumer_data.type == type);
the_consumer_data.type = type;
ctx = zmalloc<lttng_consumer_local_data>();
}
rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
/*
* Ignore return value since we are currently cleaning up so any error
* can't be handled.
}
rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
/*
* Ignore return value since we are currently cleaning up so any error
* can't be handled.
/*
* Write the metadata stream id on the specified file descriptor.
*/
-static int write_relayd_metadata_id(int fd,
- struct lttng_consumer_stream *stream,
- unsigned long padding)
+static int
+write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, unsigned long padding)
{
ssize_t ret;
struct lttcomm_relayd_metadata_payload hdr;
goto end;
}
DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
- stream->relayd_stream_id, padding);
+ stream->relayd_stream_id,
+ padding);
end:
return (int) ret;
*
* Returns the number of bytes written
*/
-ssize_t lttng_consumer_on_read_subbuffer_mmap(
- struct lttng_consumer_stream *stream,
- const struct lttng_buffer_view *buffer,
- unsigned long padding)
+ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
+ const struct lttng_buffer_view *buffer,
+ unsigned long padding)
{
ssize_t ret = 0;
off_t orig_offset = stream->out_fd_offset;
/* RCU lock for the relayd pointer */
rcu_read_lock();
- LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL ||
- stream->trace_chunk);
+ LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->reset_metadata_flag) {
ret = relayd_reset_metadata(&relayd->control_sock,
- stream->relayd_stream_id,
- stream->metadata_version);
+ stream->relayd_stream_id,
+ stream->metadata_version);
if (ret < 0) {
relayd_hang_up = 1;
goto write_error;
* Check if we need to change the tracefile before writing the packet.
*/
if (stream->chan->tracefile_size > 0 &&
- (stream->tracefile_size_current + buffer->size) >
- stream->chan->tracefile_size) {
+ (stream->tracefile_size_current + buffer->size) >
+ stream->chan->tracefile_size) {
ret = consumer_stream_rotate_output_files(stream);
if (ret) {
goto end;
DBG("Consumer mmap write detected relayd hang up");
} else {
/* Unhandled error, print it and stop function right now. */
- PERROR("Error in write mmap (ret %zd != write_len %zu)", ret,
- write_len);
+ PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, write_len);
}
goto write_error;
}
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(outfd, stream->out_fd_offset, write_len,
- SYNC_FILE_RANGE_WRITE);
+ lttng_sync_file_range(
+ outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += write_len;
lttng_consumer_sync_trace_file(stream, orig_offset);
}
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
}
*
* Returns the number of bytes spliced.
*/
-ssize_t lttng_consumer_on_read_subbuffer_splice(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding)
+ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream,
+ unsigned long len,
+ unsigned long padding)
{
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
if (stream->reset_metadata_flag) {
ret = relayd_reset_metadata(&relayd->control_sock,
- stream->relayd_stream_id,
- stream->metadata_version);
+ stream->relayd_stream_id,
+ stream->metadata_version);
if (ret < 0) {
relayd_hang_up = 1;
goto write_error;
}
stream->reset_metadata_flag = 0;
}
- ret = write_relayd_metadata_id(splice_pipe[1], stream,
- padding);
+ ret = write_relayd_metadata_id(splice_pipe[1], stream, padding);
if (ret < 0) {
written = ret;
relayd_hang_up = 1;
* Check if we need to change the tracefile before writing the packet.
*/
if (stream->chan->tracefile_size > 0 &&
- (stream->tracefile_size_current + len) >
- stream->chan->tracefile_size) {
+ (stream->tracefile_size_current + len) > stream->chan->tracefile_size) {
ret = consumer_stream_rotate_output_files(stream);
if (ret < 0) {
written = ret;
while (len > 0) {
DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
- (unsigned long)offset, len, fd, splice_pipe[1]);
- ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
- SPLICE_F_MOVE | SPLICE_F_MORE);
+ (unsigned long) offset,
+ len,
+ fd,
+ splice_pipe[1]);
+ ret_splice = splice(
+ fd, &offset, splice_pipe[1], NULL, len, SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice chan to pipe, ret %zd", ret_splice);
if (ret_splice < 0) {
ret = errno;
}
/* Splice data out */
- ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
- ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
- DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
- outfd, ret_splice);
+ ret_splice = splice(splice_pipe[0],
+ NULL,
+ outfd,
+ NULL,
+ ret_splice,
+ SPLICE_F_MOVE | SPLICE_F_MORE);
+ DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
if (ret_splice < 0) {
ret = errno;
written = -ret;
*/
ret = errno;
written += ret_splice;
- PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
- len);
+ PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len);
goto splice_error;
} else {
/* All good, update current len and continue. */
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
- SYNC_FILE_RANGE_WRITE);
+ lttng_sync_file_range(
+ outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret_splice;
}
stream->output_written += ret_splice;
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
/* Skip splice error so the consumer does not fail */
goto end;
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
- unsigned long *pos)
+int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
- unsigned long *pos)
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
}
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
- int sock, struct pollfd *consumer_sockpoll)
+ int sock,
+ struct pollfd *consumer_sockpoll)
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
}
}
-static
-void lttng_consumer_close_all_metadata(void)
+static void lttng_consumer_close_all_metadata(void)
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
/*
* Clean up a metadata stream and free its memory.
*/
-void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
{
struct lttng_consumer_channel *channel = NULL;
bool free_channel = false;
consumer_stream_destroy_buffers(stream);
/* Atomically decrement channel refcount since other threads can use it. */
- if (!uatomic_sub_return(&channel->refcount, 1)
- && !uatomic_read(&channel->nb_init_stream_left)) {
+ if (!uatomic_sub_return(&channel->refcount, 1) &&
+ !uatomic_read(&channel->nb_init_stream_left)) {
/* Go for channel deletion! */
free_channel = true;
}
lttng_ht_add_unique_u64(ht, &stream->node);
- lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht,
- &stream->node_channel_id);
+ lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
/*
* Add stream to the stream_list_ht of the consumer data. No need to steal
* the key since the HT does not use it and we allow to add redundant keys
* into this table.
*/
- lttng_ht_add_u64(the_consumer_data.stream_list_ht,
- &stream->node_session_id);
+ lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
rcu_read_unlock();
DBG("Consumer delete flagged data stream");
rcu_read_lock();
- cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+ cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
continue;
/*
* Delete metadata stream that are flagged for deletion (endpoint_status).
*/
-static void validate_endpoint_status_metadata_stream(
- struct lttng_poll_event *pollset)
+static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
LTTNG_ASSERT(pollset);
rcu_read_lock();
- cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
continue;
goto end_poll;
}
- ret = lttng_poll_add(&events,
- lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
+ ret = lttng_poll_add(&events, lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
if (ret < 0) {
goto end;
}
DBG("Metadata main loop started");
while (1) {
-restart:
+ restart:
health_code_update();
health_poll_entry();
DBG("Metadata poll wait");
ret = lttng_poll_wait(&events, -1);
- DBG("Metadata poll return from wait with %d fd(s)",
- LTTNG_POLL_GETNB(&events));
+ DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
health_poll_exit();
DBG("Metadata event caught in thread");
if (ret < 0) {
goto restart;
}
if (LTTNG_POLL_GETNB(&events) == 0) {
- err = 0; /* All is OK */
+ err = 0; /* All is OK */
}
goto end;
}
ssize_t pipe_len;
pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
- &stream, sizeof(stream));
+ &stream,
+ sizeof(stream));
if (pipe_len < sizeof(stream)) {
if (pipe_len < 0) {
PERROR("read metadata stream");
}
/*
- * Remove the pipe from the poll set and continue the loop
- * since their might be data to consume.
+ * Remove the pipe from the poll set and continue
+ * the loop since their might be data to consume.
*/
- lttng_poll_del(&events,
- lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+ lttng_poll_del(
+ &events,
+ lttng_pipe_get_readfd(
+ ctx->consumer_metadata_pipe));
lttng_pipe_read_close(ctx->consumer_metadata_pipe);
continue;
}
}
DBG("Adding metadata stream %d to poll set",
- stream->wait_fd);
+ stream->wait_fd);
/* Add metadata stream to the global poll events list */
- lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI);
- }else if (revents & (LPOLLERR | LPOLLHUP)) {
+ lttng_poll_add(
+ &events, stream->wait_fd, LPOLLIN | LPOLLPRI);
+ } else if (revents & (LPOLLERR | LPOLLHUP)) {
DBG("Metadata thread pipe hung up");
/*
* Remove the pipe from the poll set and continue the loop
* since their might be data to consume.
*/
- lttng_poll_del(&events,
- lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+ lttng_poll_del(
+ &events,
+ lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
lttng_pipe_read_close(ctx->consumer_metadata_pipe);
continue;
} else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ ERR("Unexpected poll events %u for sock %d",
+ revents,
+ pollfd);
goto end;
}
node = lttng_ht_iter_get_node_u64(&iter);
LTTNG_ASSERT(node);
- stream = caa_container_of(node, struct lttng_consumer_stream,
- node);
+ stream = caa_container_of(node, struct lttng_consumer_stream, node);
if (revents & (LPOLLIN | LPOLLPRI)) {
/* Get the data out of the metadata file descriptor */
} else if (revents & (LPOLLERR | LPOLLHUP)) {
DBG("Metadata fd %d is hup|err.", pollfd);
if (!stream->hangup_flush_done &&
- (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
- the_consumer_data.type ==
- LTTNG_CONSUMER64_UST)) {
+ (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
+ the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
DBG("Attempting to flush and consume the UST buffers");
lttng_ustconsumer_on_stream_hangup(stream);
len = ctx->on_buffer_ready(stream, ctx, false);
/*
- * We don't check the return value here since if we get
- * a negative len, it means an error occurred thus we
- * simply remove it from the poll set and free the
- * stream.
+ * We don't check the return value here since if we
+ * get a negative len, it means an error occurred
+ * thus we simply remove it from the poll set and
+ * free the stream.
*/
} while (len > 0);
}
local_stream = NULL;
/* Allocate for all fds */
- pollfd = calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
+ pollfd =
+ calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&the_consumer_data.lock);
goto end;
}
- local_stream = calloc<lttng_consumer_stream *>(the_consumer_data.stream_count + nb_pipes_fd);
+ local_stream = calloc<lttng_consumer_stream *>(
+ the_consumer_data.stream_count + nb_pipes_fd);
if (local_stream == NULL) {
PERROR("local_stream malloc");
pthread_mutex_unlock(&the_consumer_data.lock);
goto end;
}
- ret = update_poll_array(ctx, &pollfd, local_stream,
- data_ht, &nb_inactive_fd);
+ ret = update_poll_array(
+ ctx, &pollfd, local_stream, data_ht, &nb_inactive_fd);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
pthread_mutex_unlock(&the_consumer_data.lock);
/* No FDs and consumer_quit, consumer_cleanup the thread */
- if (nb_fd == 0 && nb_inactive_fd == 0 &&
- CMM_LOAD_SHARED(consumer_quit) == 1) {
- err = 0; /* All is OK */
+ if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
+ err = 0; /* All is OK */
goto end;
}
/* poll on the array of fds */
ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
- pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
- &new_stream, sizeof(new_stream));
+ pipe_readlen = lttng_pipe_read(
+ ctx->consumer_data_pipe, &new_stream, sizeof(new_stream));
if (pipe_readlen < sizeof(new_stream)) {
PERROR("Consumer data pipe");
/* Continue so we can at least handle the current stream(s). */
char dummy;
ssize_t pipe_readlen;
- pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
- sizeof(dummy));
+ pipe_readlen =
+ lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, sizeof(dummy));
if (pipe_readlen < 0) {
PERROR("Consumer data wakeup pipe");
}
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
} else if (len > 0) {
- local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown =
+ 1;
}
}
}
if (local_stream[i] == NULL) {
continue;
}
- if ((pollfd[i].revents & POLLIN) ||
- local_stream[i]->hangup_flush_done ||
- local_stream[i]->has_data) {
+ if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
+ local_stream[i]->has_data) {
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx, false);
/* it's ok to have an unavailable sub-buffer */
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
} else if (len > 0) {
- local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown =
+ 1;
}
}
}
if (local_stream[i] == NULL) {
continue;
}
- if (!local_stream[i]->hangup_flush_done
- && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
- && (the_consumer_data.type == LTTNG_CONSUMER32_UST
- || the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
+ if (!local_stream[i]->hangup_flush_done &&
+ (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) &&
+ (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
+ the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
DBG("fd %d is hup|err|nval. Attempting flush and read.",
- pollfd[i].fd);
+ pollfd[i].fd);
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
/* Attempt read again, for the data we just flushed. */
local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
* allow the poll() on the stream read-side to detect when the
* write-side (application) finally closes them.
*/
-static
-void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
+static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
{
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key,
- &iter.iter, stream, node_channel_id.node) {
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
/*
* Protect against teardown with mutex.
*/
}
rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
ret = lttng_ht_del(ht, &iter);
LTTNG_ASSERT(ret != 0);
}
DBG("Channel main loop started");
while (1) {
-restart:
+ restart:
health_code_update();
DBG("Channel poll wait");
health_poll_entry();
ret = lttng_poll_wait(&events, -1);
- DBG("Channel poll return from wait with %d fd(s)",
- LTTNG_POLL_GETNB(&events));
+ DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
health_poll_exit();
DBG("Channel event caught in thread");
if (ret < 0) {
goto restart;
}
if (LTTNG_POLL_GETNB(&events) == 0) {
- err = 0; /* All is OK */
+ err = 0; /* All is OK */
}
goto end;
}
if (ret < 0) {
ERR("Error reading channel pipe");
}
- lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+ lttng_poll_del(&events,
+ ctx->consumer_channel_pipe[0]);
continue;
}
switch (action) {
case CONSUMER_CHANNEL_ADD:
- DBG("Adding channel %d to poll set",
- chan->wait_fd);
+ DBG("Adding channel %d to poll set", chan->wait_fd);
lttng_ht_node_init_u64(&chan->wait_fd_node,
- chan->wait_fd);
+ chan->wait_fd);
rcu_read_lock();
lttng_ht_add_unique_u64(channel_ht,
- &chan->wait_fd_node);
+ &chan->wait_fd_node);
rcu_read_unlock();
/* Add channel to the global poll events list */
- // FIXME: Empty flag on a pipe pollset, this might hang on FreeBSD.
+ // FIXME: Empty flag on a pipe pollset, this might
+ // hang on FreeBSD.
lttng_poll_add(&events, chan->wait_fd, 0);
break;
case CONSUMER_CHANNEL_DEL:
{
/*
- * This command should never be called if the channel
- * has streams monitored by either the data or metadata
- * thread. The consumer only notify this thread with a
- * channel del. command if it receives a destroy
- * channel command from the session daemon that send it
- * if a command prior to the GET_CHANNEL failed.
+ * This command should never be called if the
+ * channel has streams monitored by either the data
+ * or metadata thread. The consumer only notify this
+ * thread with a channel del. command if it receives
+ * a destroy channel command from the session daemon
+ * that send it if a command prior to the
+ * GET_CHANNEL failed.
*/
rcu_read_lock();
chan = consumer_find_channel(key);
if (!chan) {
rcu_read_unlock();
- ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
+ ERR("UST consumer get channel key %" PRIu64
+ " not found for del channel",
+ key);
break;
}
lttng_poll_del(&events, chan->wait_fd);
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
health_code_update();
- /* Destroy streams that might have been left in the stream list. */
+ /* Destroy streams that might have been left
+ * in the stream list. */
clean_channel_stream_list(chan);
break;
default:
}
/*
- * Release our own refcount. Force channel deletion even if
- * streams were not initialized.
+ * Release our own refcount. Force channel deletion
+ * even if streams were not initialized.
*/
if (!uatomic_sub_return(&chan->refcount, 1)) {
consumer_del_channel(chan);
}
case CONSUMER_CHANNEL_QUIT:
/*
- * Remove the pipe from the poll set and continue the loop
- * since their might be data to consume.
+ * Remove the pipe from the poll set and continue
+ * the loop since their might be data to consume.
*/
- lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+ lttng_poll_del(&events,
+ ctx->consumer_channel_pipe[0]);
continue;
default:
ERR("Unknown action");
lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
continue;
} else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ ERR("Unexpected poll events %u for sock %d",
+ revents,
+ pollfd);
goto end;
}
node = lttng_ht_iter_get_node_u64(&iter);
LTTNG_ASSERT(node);
- chan = caa_container_of(node, struct lttng_consumer_channel,
- wait_fd_node);
+ chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
/* Check for error event */
if (revents & (LPOLLERR | LPOLLHUP)) {
consumer_close_channel_streams(chan);
/* Release our own refcount */
- if (!uatomic_sub_return(&chan->refcount, 1)
- && !uatomic_read(&chan->nb_init_stream_left)) {
+ if (!uatomic_sub_return(&chan->refcount, 1) &&
+ !uatomic_read(&chan->nb_init_stream_left)) {
consumer_del_channel(chan);
}
} else {
}
static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
- struct pollfd *sockpoll, int client_socket)
+ struct pollfd *sockpoll,
+ int client_socket)
{
int ret;
}
if (CMM_LOAD_SHARED(consumer_quit)) {
DBG("consumer_thread_receive_fds received quit from signal");
- err = 0; /* All is OK */
+ err = 0; /* All is OK */
goto end;
}
DBG("Received command on sock");
}
static int post_consume(struct lttng_consumer_stream *stream,
- const struct stream_subbuffer *subbuffer,
- struct lttng_consumer_local_data *ctx)
+ const struct stream_subbuffer *subbuffer,
+ struct lttng_consumer_local_data *ctx)
{
size_t i;
int ret = 0;
- const size_t count = lttng_dynamic_array_get_count(
- &stream->read_subbuffer_ops.post_consume_cbs);
+ const size_t count =
+ lttng_dynamic_array_get_count(&stream->read_subbuffer_ops.post_consume_cbs);
for (i = 0; i < count; i++) {
const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
- &stream->read_subbuffer_ops.post_consume_cbs,
- i);
+ &stream->read_subbuffer_ops.post_consume_cbs, i);
ret = op(stream, subbuffer, ctx);
if (ret) {
}
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx,
- bool locked_by_caller)
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller)
{
ssize_t ret, written_bytes = 0;
int rotation_ret;
}
}
- get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(
- stream, &subbuffer);
+ get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
switch (get_next_status) {
case GET_NEXT_SUBBUFFER_STATUS_OK:
break;
abort();
}
- ret = stream->read_subbuffer_ops.pre_consume_subbuffer(
- stream, &subbuffer);
+ ret = stream->read_subbuffer_ops.pre_consume_subbuffer(stream, &subbuffer);
if (ret) {
goto error_put_subbuf;
}
- written_bytes = stream->read_subbuffer_ops.consume_subbuffer(
- ctx, stream, &subbuffer);
+ written_bytes = stream->read_subbuffer_ops.consume_subbuffer(ctx, stream, &subbuffer);
if (written_bytes <= 0) {
ERR("Error consuming subbuffer: (%zd)", written_bytes);
ret = (int) written_bytes;
goto error;
}
- the_consumer_data.channels_by_session_id_ht =
- lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ the_consumer_data.channels_by_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!the_consumer_data.channels_by_session_id_ht) {
goto error;
}
goto error;
}
- the_consumer_data.stream_per_chan_id_ht =
- lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ the_consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!the_consumer_data.stream_per_chan_id_ht) {
goto error;
}
* The caller MUST acquire a RCU read side lock before calling it.
*/
void consumer_add_relayd_socket(uint64_t net_seq_idx,
- int sock_type,
- struct lttng_consumer_local_data *ctx,
- int sock,
- struct pollfd *consumer_sockpoll,
- uint64_t sessiond_id,
- uint64_t relayd_session_id,
- uint32_t relayd_version_major,
- uint32_t relayd_version_minor,
- enum lttcomm_sock_proto relayd_socket_protocol)
+ int sock_type,
+ struct lttng_consumer_local_data *ctx,
+ int sock,
+ struct pollfd *consumer_sockpoll,
+ uint64_t sessiond_id,
+ uint64_t relayd_session_id,
+ uint32_t relayd_version_major,
+ uint32_t relayd_version_minor,
+ enum lttcomm_sock_proto relayd_socket_protocol)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
/* Get relayd socket from session daemon */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
- fd = -1; /* Just in case it gets set with an invalid value. */
+ fd = -1; /* Just in case it gets set with an invalid value. */
/*
* Failing to receive FDs might indicate a major problem such as
case LTTNG_STREAM_CONTROL:
/* Copy received lttcomm socket */
ret = lttcomm_populate_sock_from_open_socket(
- &relayd->control_sock.sock, fd,
- relayd_socket_protocol);
+ &relayd->control_sock.sock, fd, relayd_socket_protocol);
/* Assign version values. */
relayd->control_sock.major = relayd_version_major;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
ret = lttcomm_populate_sock_from_open_socket(
- &relayd->data_sock.sock, fd,
- relayd_socket_protocol);
+ &relayd->data_sock.sock, fd, relayd_socket_protocol);
/* Assign version values. */
relayd->data_sock.major = relayd_version_major;
relayd->data_sock.minor = relayd_version_minor;
}
DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
- sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
- relayd->net_seq_idx, fd);
+ sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
+ relayd->net_seq_idx,
+ fd);
/*
* We gave the ownership of the fd to the relayd structure. Set the
* fd to -1 so we don't call close() on it in the error path below.
ASSERT_RCU_READ_LOCKED();
/* Iterate over all relayd since they are indexed by net_seq_idx. */
- cds_lfht_for_each_entry(the_consumer_data.relayd_ht->ht, &iter.iter,
- relayd, node.node) {
+ cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
/*
* Check by sessiond id which is unique here where the relayd session
* id might not be when having multiple relayd.
ht = the_consumer_data.stream_list_ht;
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct, &id,
- &iter.iter, stream, node_session_id.node) {
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct,
+ &id,
+ &iter.iter,
+ stream,
+ node_session_id.node)
+ {
pthread_mutex_lock(&stream->lock);
/*
/* Send init command for data pending. */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_begin_data_pending(&relayd->control_sock,
- relayd->relayd_session_id);
+ ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
if (ret < 0) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
/* Communication error thus the relayd so no data pending. */
}
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct, &id,
- &iter.iter, stream, node_session_id.node) {
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct,
+ &id,
+ &iter.iter,
+ stream,
+ node_session_id.node)
+ {
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock,
- stream->relayd_stream_id);
+ stream->relayd_stream_id);
} else {
ret = relayd_data_pending(&relayd->control_sock,
- stream->relayd_stream_id,
- stream->next_net_seq_num - 1);
+ stream->relayd_stream_id,
+ stream->next_net_seq_num - 1);
}
if (ret == 1) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_pending;
} else if (ret < 0) {
- ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
+ relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_not_pending;
}
/* Send end command for data pending. */
- ret = relayd_end_data_pending(&relayd->control_sock,
- relayd->relayd_session_id, &is_data_inflight);
+ ret = relayd_end_data_pending(
+ &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
+ relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
goto data_not_pending;
}
*
* Return the sendmsg() return value.
*/
-int consumer_send_status_channel(int sock,
- struct lttng_consumer_channel *channel)
+int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel)
{
struct lttcomm_consumer_status_channel msg;
}
unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
- unsigned long produced_pos, uint64_t nb_packets_per_stream,
- uint64_t max_sb_size)
+ unsigned long produced_pos,
+ uint64_t nb_packets_per_stream,
+ uint64_t max_sb_size)
{
unsigned long start_pos;
if (!nb_packets_per_stream) {
- return consumed_pos; /* Grab everything */
+ return consumed_pos; /* Grab everything */
}
start_pos = produced_pos - lttng_offset_align_floor(produced_pos, max_sb_size);
start_pos -= max_sb_size * nb_packets_per_stream;
if ((long) (start_pos - consumed_pos) < 0) {
- return consumed_pos; /* Grab everything */
+ return consumed_pos; /* Grab everything */
}
return start_pos;
}
/* Stream lock must be held by the caller. */
static int sample_stream_positions(struct lttng_consumer_stream *stream,
- unsigned long *produced, unsigned long *consumed)
+ unsigned long *produced,
+ unsigned long *consumed)
{
int ret;
* Returns 0 on success, < 0 on error
*/
int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
- uint64_t key, uint64_t relayd_id)
+ uint64_t key,
+ uint64_t relayd_id)
{
int ret;
struct lttng_consumer_stream *stream;
DBG("Consumer sample rotate position for channel %" PRIu64, key);
- lttng_dynamic_array_init(&stream_rotation_positions,
- sizeof(struct relayd_stream_rotation_position), NULL);
+ lttng_dynamic_array_init(
+ &stream_rotation_positions, sizeof(struct relayd_stream_rotation_position), NULL);
lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
rcu_read_lock();
pthread_mutex_lock(&channel->lock);
LTTNG_ASSERT(channel->trace_chunk);
- chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
- &next_chunk_id);
+ chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
goto end_unlock_channel;
}
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
unsigned long produced_pos = 0, consumed_pos = 0;
health_code_update();
* for this stream during this trace
* chunk's lifetime.
*/
- ret = sample_stream_positions(stream, &produced_pos, &consumed_pos);
+ ret = sample_stream_positions(
+ stream, &produced_pos, &consumed_pos);
if (ret) {
goto end_unlock_stream;
}
uint64_t trace_chunk_id;
chunk_status = lttng_trace_chunk_get_name(
- stream->trace_chunk,
- &trace_chunk_name,
- NULL);
+ stream->trace_chunk,
+ &trace_chunk_name,
+ NULL);
if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
trace_chunk_name = "none";
}
* never anonymous.
*/
chunk_status = lttng_trace_chunk_get_id(
- stream->trace_chunk,
- &trace_chunk_id);
+ stream->trace_chunk, &trace_chunk_id);
LTTNG_ASSERT(chunk_status ==
- LTTNG_TRACE_CHUNK_STATUS_OK);
+ LTTNG_TRACE_CHUNK_STATUS_OK);
DBG("Unable to open packet for stream during trace chunk's lifetime. "
- "Flushing an empty packet to prevent an empty file from being created: "
- "stream id = %" PRIu64 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
- stream->key, trace_chunk_name, trace_chunk_id);
+ "Flushing an empty packet to prevent an empty file from being created: "
+ "stream id = %" PRIu64
+ ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
+ stream->key,
+ trace_chunk_name,
+ trace_chunk_id);
}
}
}
ret = consumer_stream_flush_buffer(stream, flush_active);
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel rotation",
- stream->key);
+ stream->key);
goto end_unlock_stream;
}
}
goto end_unlock_stream;
}
if (!ret) {
- ret = lttng_consumer_get_produced_snapshot(stream,
- &produced_pos);
+ ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Failed to sample produced position during channel rotation");
goto end_unlock_stream;
}
- ret = lttng_consumer_get_consumed_snapshot(stream,
- &consumed_pos);
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Failed to sample consumed position during channel rotation");
goto end_unlock_stream;
produced_pos = lttng_align_floor(produced_pos, stream->max_sb_size);
if (consumed_pos == produced_pos) {
DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
- stream->key, produced_pos, consumed_pos);
+ stream->key,
+ produced_pos,
+ consumed_pos);
stream->rotate_ready = true;
} else {
DBG("Different consumed and produced positions "
- "for stream %" PRIu64 " produced = %lu consumed = %lu",
- stream->key, produced_pos, consumed_pos);
+ "for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key,
+ produced_pos,
+ consumed_pos);
}
/*
* The rotation position is based on the packet_seq_num of the
* not implement packet sequence number.
*/
ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
- stream->key);
+ stream->key);
ret = -1;
goto end_unlock_stream;
}
stream->rotate_position = stream->last_sequence_number + 1 +
- ((produced_pos - consumed_pos) / stream->max_sb_size);
+ ((produced_pos - consumed_pos) / stream->max_sb_size);
DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
- stream->key, stream->rotate_position);
+ stream->key,
+ stream->rotate_position);
if (!is_local_trace) {
/*
.rotate_at_seq_num = stream->rotate_position,
};
- ret = lttng_dynamic_array_add_element(
- &stream_rotation_positions,
- &position);
+ ret = lttng_dynamic_array_add_element(&stream_rotation_positions,
+ &position);
if (ret) {
ERR("Failed to allocate stream rotation position");
goto end_unlock_stream;
* is performed in a stream that has no active trace
* chunk.
*/
- ret = lttng_dynamic_pointer_array_add_pointer(
- &streams_packet_to_open, stream);
+ ret = lttng_dynamic_pointer_array_add_pointer(&streams_packet_to_open,
+ stream);
if (ret) {
PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
ret = -1;
}
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
- rotating_to_new_chunk ? &next_chunk_id : NULL,
- (const struct relayd_stream_rotation_position *)
- stream_rotation_positions.buffer
- .data);
+ ret = relayd_rotate_streams(&relayd->control_sock,
+ stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer.data);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
- relayd->net_seq_idx);
+ relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
goto end_unlock_channel;
}
}
for (stream_idx = 0;
- stream_idx < lttng_dynamic_pointer_array_get_count(
- &streams_packet_to_open);
- stream_idx++) {
+ stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
+ stream_idx++) {
enum consumer_stream_open_packet_status status;
stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
- &streams_packet_to_open, stream_idx);
+ &streams_packet_to_open, stream_idx);
pthread_mutex_lock(&stream->lock);
status = consumer_stream_open_packet(stream);
case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
DBG("Opened a packet after a rotation: stream id = %" PRIu64
", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
break;
case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
/*
*/
DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
break;
case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
/* Logged by callee. */
return ret;
}
-static
-int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+static int consumer_clear_buffer(struct lttng_consumer_stream *stream)
{
int ret = 0;
unsigned long consumed_pos_before, consumed_pos_after;
return ret;
}
-static
-int consumer_clear_stream(struct lttng_consumer_stream *stream)
+static int consumer_clear_stream(struct lttng_consumer_stream *stream)
{
int ret;
ret = consumer_stream_flush_buffer(stream, 1);
if (ret < 0) {
- ERR("Failed to flush stream %" PRIu64 " during channel clear",
- stream->key);
+ ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
ret = LTTCOMM_CONSUMERD_FATAL;
goto error;
}
ret = consumer_clear_buffer(stream);
if (ret < 0) {
- ERR("Failed to clear stream %" PRIu64 " during channel clear",
- stream->key);
+ ERR("Failed to clear stream %" PRIu64 " during channel clear", stream->key);
ret = LTTCOMM_CONSUMERD_FATAL;
goto error;
}
return ret;
}
-static
-int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
{
int ret;
struct lttng_consumer_stream *stream;
rcu_read_lock();
pthread_mutex_lock(&channel->lock);
- cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
health_code_update();
pthread_mutex_lock(&stream->lock);
ret = consumer_clear_stream(stream);
*/
int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
{
- DBG("Check is rotate ready for stream %" PRIu64
- " ready %u rotate_position %" PRIu64
- " last_sequence_number %" PRIu64,
- stream->key, stream->rotate_ready,
- stream->rotate_position, stream->last_sequence_number);
+ DBG("Check is rotate ready for stream %" PRIu64 " ready %u rotate_position %" PRIu64
+ " last_sequence_number %" PRIu64,
+ stream->key,
+ stream->rotate_ready,
+ stream->rotate_position,
+ stream->last_sequence_number);
if (stream->rotate_ready) {
return 1;
}
*/
if (stream->sequence_number_unavailable) {
ERR("Internal error: rotation used on stream %" PRIu64
- " with unavailable sequence number",
- stream->key);
+ " with unavailable sequence number",
+ stream->key);
return -1;
}
- if (stream->rotate_position == -1ULL ||
- stream->last_sequence_number == -1ULL) {
+ if (stream->rotate_position == -1ULL || stream->last_sequence_number == -1ULL) {
return 0;
}
* packet of the current chunk, hence the "rotate_position - 1".
*/
- DBG("Check is rotate ready for stream %" PRIu64
- " last_sequence_number %" PRIu64
- " rotate_position %" PRIu64,
- stream->key, stream->last_sequence_number,
- stream->rotate_position);
+ DBG("Check is rotate ready for stream %" PRIu64 " last_sequence_number %" PRIu64
+ " rotate_position %" PRIu64,
+ stream->key,
+ stream->last_sequence_number,
+ stream->rotate_position);
if (stream->last_sequence_number >= stream->rotate_position - 1) {
return 1;
}
*/
void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
{
- DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64,
- stream->key);
+ DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, stream->key);
stream->rotate_position = -1ULL;
stream->rotate_ready = false;
}
/*
* Perform the rotation a local stream file.
*/
-static
-int rotate_local_stream(struct lttng_consumer_stream *stream)
+static int rotate_local_stream(struct lttng_consumer_stream *stream)
{
int ret = 0;
DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
- stream->key,
- stream->chan->key);
+ stream->key,
+ stream->chan->key);
stream->tracefile_size_current = 0;
stream->tracefile_count_current = 0;
ret = close(stream->out_fd);
if (ret) {
PERROR("Failed to close stream out_fd of channel \"%s\"",
- stream->chan->name);
+ stream->chan->name);
}
stream->out_fd = -1;
}
* anything until a new trace chunk is created.
*/
stream->trace_chunk = NULL;
- } else if (stream->chan->trace_chunk &&
- !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
+ } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
ret = -1;
goto error;
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
- uint64_t key)
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
{
int ret;
struct lttng_consumer_stream *stream;
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
health_code_update();
pthread_mutex_lock(&stream->chan->lock);
return ret;
}
-enum lttcomm_return_code lttng_consumer_init_command(
- struct lttng_consumer_local_data *ctx,
- const lttng_uuid& sessiond_uuid)
+enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
+ const lttng_uuid& sessiond_uuid)
{
enum lttcomm_return_code ret;
char uuid_str[LTTNG_UUID_STR_LEN];
return ret;
}
-enum lttcomm_return_code lttng_consumer_create_trace_chunk(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id,
- time_t chunk_creation_timestamp,
- const char *chunk_override_name,
- const struct lttng_credentials *credentials,
- struct lttng_directory_handle *chunk_directory_handle)
+enum lttcomm_return_code
+lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
+ uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_creation_timestamp,
+ const char *chunk_override_name,
+ const struct lttng_credentials *credentials,
+ struct lttng_directory_handle *chunk_directory_handle)
{
int ret;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
if (relayd_id) {
/* Only used for logging purposes. */
- ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
- "%" PRIu64, *relayd_id);
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
relayd_id_str = relayd_id_buffer;
} else {
/* Local protocol error. */
LTTNG_ASSERT(chunk_creation_timestamp);
ret = time_to_iso8601_str(chunk_creation_timestamp,
- creation_timestamp_buffer,
- sizeof(creation_timestamp_buffer));
- creation_timestamp_str = !ret ? creation_timestamp_buffer :
- "(formatting error)";
+ creation_timestamp_buffer,
+ sizeof(creation_timestamp_buffer));
+ creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
DBG("Consumer create trace chunk command: relay_id = %s"
- ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
- ", chunk_override_name = %s"
- ", chunk_creation_timestamp = %s",
- relayd_id_str, session_id, chunk_id,
- chunk_override_name ? : "(none)",
- creation_timestamp_str);
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", chunk_override_name = %s"
+ ", chunk_creation_timestamp = %s",
+ relayd_id_str,
+ session_id,
+ chunk_id,
+ chunk_override_name ?: "(none)",
+ creation_timestamp_str);
/*
* The trace chunk registry, as used by the consumer daemon, implicitly
* the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
* and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
*/
- created_chunk = lttng_trace_chunk_create(chunk_id,
- chunk_creation_timestamp, NULL);
+ created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, NULL);
if (!created_chunk) {
ERR("Failed to create trace chunk");
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
}
if (chunk_override_name) {
- chunk_status = lttng_trace_chunk_override_name(created_chunk,
- chunk_override_name);
+ chunk_status = lttng_trace_chunk_override_name(created_chunk, chunk_override_name);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
goto error;
}
if (chunk_directory_handle) {
- chunk_status = lttng_trace_chunk_set_credentials(created_chunk,
- credentials);
+ chunk_status = lttng_trace_chunk_set_credentials(created_chunk, credentials);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to set trace chunk credentials");
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
* The consumer daemon has no ownership of the chunk output
* directory.
*/
- chunk_status = lttng_trace_chunk_set_as_user(created_chunk,
- chunk_directory_handle);
+ chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
chunk_directory_handle = NULL;
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to set trace chunk's directory handle");
}
published_chunk = lttng_trace_chunk_registry_publish_chunk(
- the_consumer_data.chunk_registry, session_id,
- created_chunk);
+ the_consumer_data.chunk_registry, session_id, created_chunk);
lttng_trace_chunk_put(created_chunk);
created_chunk = NULL;
if (!published_chunk) {
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(
- the_consumer_data.channels_by_session_id_ht->ht,
- the_consumer_data.channels_by_session_id_ht->hash_fct(
- &session_id, lttng_ht_seed),
- the_consumer_data.channels_by_session_id_ht->match_fct,
- &session_id, &iter.iter, channel,
- channels_by_session_id_ht_node.node) {
- ret = lttng_consumer_channel_set_trace_chunk(channel,
- published_chunk);
+ the_consumer_data.channels_by_session_id_ht->ht,
+ the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed),
+ the_consumer_data.channels_by_session_id_ht->match_fct,
+ &session_id,
+ &iter.iter,
+ channel,
+ channels_by_session_id_ht_node.node)
+ {
+ ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
if (ret) {
/*
* Roll-back the creation of this chunk.
DBG("Failed to set new trace chunk on existing channels, rolling back");
close_ret = lttng_consumer_close_trace_chunk(relayd_id,
- session_id, chunk_id,
- chunk_creation_timestamp, NULL,
- path);
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ NULL,
+ path);
if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
- ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
- session_id, chunk_id);
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
}
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
relayd = consumer_find_relayd(*relayd_id);
if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_create_trace_chunk(
- &relayd->control_sock, published_chunk);
+ ret = relayd_create_trace_chunk(&relayd->control_sock, published_chunk);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
char path[LTTNG_PATH_MAX];
close_ret = lttng_consumer_close_trace_chunk(relayd_id,
- session_id,
- chunk_id,
- chunk_creation_timestamp,
- NULL, path);
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ NULL,
+ path);
if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
- ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
- session_id,
- chunk_id);
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
}
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
return ret_code;
}
-enum lttcomm_return_code lttng_consumer_close_trace_chunk(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id, time_t chunk_close_timestamp,
- const enum lttng_trace_chunk_command_type *close_command,
- char *path)
+enum lttcomm_return_code
+lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
+ uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path)
{
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttng_trace_chunk *chunk;
int ret;
/* Only used for logging purposes. */
- ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
- "%" PRIu64, *relayd_id);
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
relayd_id_str = relayd_id_buffer;
} else {
}
}
if (close_command) {
- close_command_name = lttng_trace_chunk_command_type_get_name(
- *close_command);
+ close_command_name = lttng_trace_chunk_command_type_get_name(*close_command);
}
DBG("Consumer close trace chunk command: relayd_id = %s"
- ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
- ", close command = %s",
- relayd_id_str, session_id, chunk_id,
- close_command_name);
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = %s",
+ relayd_id_str,
+ session_id,
+ chunk_id,
+ close_command_name);
chunk = lttng_trace_chunk_registry_find_chunk(
- the_consumer_data.chunk_registry, session_id, chunk_id);
+ the_consumer_data.chunk_registry, session_id, chunk_id);
if (!chunk) {
- ERR("Failed to find chunk: session_id = %" PRIu64
- ", chunk_id = %" PRIu64,
- session_id, chunk_id);
+ ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
goto end;
}
- chunk_status = lttng_trace_chunk_set_close_timestamp(chunk,
- chunk_close_timestamp);
+ chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, chunk_close_timestamp);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
goto end;
}
if (close_command) {
- chunk_status = lttng_trace_chunk_set_close_command(
- chunk, *close_command);
+ chunk_status = lttng_trace_chunk_set_close_command(chunk, *close_command);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
goto end;
* current chunk found in the session's channels.
*/
rcu_read_lock();
- cds_lfht_for_each_entry(the_consumer_data.channel_ht->ht, &iter.iter,
- channel, node.node) {
+ cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
int ret;
/*
relayd = consumer_find_relayd(*relayd_id);
if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_close_trace_chunk(
- &relayd->control_sock, chunk,
- path);
+ ret = relayd_close_trace_chunk(&relayd->control_sock, chunk, path);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
- ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
- *relayd_id);
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
}
if (!relayd || ret) {
return ret_code;
}
-enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id)
+enum lttcomm_return_code
+lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id)
{
int ret;
enum lttcomm_return_code ret_code;
if (relayd_id) {
/* Only used for logging purposes. */
- ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
- "%" PRIu64, *relayd_id);
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
relayd_id_str = relayd_id_buffer;
} else {
}
DBG("Consumer trace chunk exists command: relayd_id = %s"
- ", chunk_id = %" PRIu64, relayd_id_str,
- chunk_id);
+ ", chunk_id = %" PRIu64,
+ relayd_id_str,
+ chunk_id);
ret = lttng_trace_chunk_registry_chunk_exists(
- the_consumer_data.chunk_registry, session_id, chunk_id,
- &chunk_exists_local);
+ the_consumer_data.chunk_registry, session_id, chunk_id, &chunk_exists_local);
if (ret) {
/* Internal error. */
ERR("Failed to query the existence of a trace chunk");
ret_code = LTTCOMM_CONSUMERD_FATAL;
goto end;
}
- DBG("Trace chunk %s locally",
- chunk_exists_local ? "exists" : "does not exist");
+ DBG("Trace chunk %s locally", chunk_exists_local ? "exists" : "does not exist");
if (chunk_exists_local) {
ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
goto end;
}
DBG("Looking up existence of trace chunk on relay daemon");
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
- &chunk_exists_remote);
+ ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id, &chunk_exists_remote);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Failed to look-up the existence of trace chunk on relay daemon");
goto end_rcu_unlock;
}
- ret_code = chunk_exists_remote ?
- LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
- LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
- DBG("Trace chunk %s on relay daemon",
- chunk_exists_remote ? "exists" : "does not exist");
+ ret_code = chunk_exists_remote ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
+ LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
end_rcu_unlock:
rcu_read_unlock();
return ret_code;
}
-static
-int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
+static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
{
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key,
- &iter.iter, stream, node_channel_id.node) {
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
/*
* Protect against teardown with mutex.
*/
return ret;
}
-enum lttcomm_return_code lttng_consumer_open_channel_packets(
- struct lttng_consumer_channel *channel)
+enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
{
struct lttng_consumer_stream *stream;
enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
}
rcu_read_lock();
- cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
enum consumer_stream_open_packet_status status;
pthread_mutex_lock(&stream->lock);
switch (status) {
case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
stream->opened_packet_in_current_trace_chunk = true;
break;
case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
break;
case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
/*
* failing. Report an unknown error.
*/
ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
- ", channel id = %" PRIu64
- ", channel name = %s"
- ", session id = %" PRIu64,
- stream->key, channel->key,
- channel->name, channel->session_id);
+ ", channel id = %" PRIu64 ", channel name = %s"
+ ", session id = %" PRIu64,
+ stream->key,
+ channel->key,
+ channel->name,
+ channel->session_id);
ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
goto error_unlock;
default: