*/
#define _LGPL_SOURCE
+#include "kernel-consumer.hpp"
+
+#include <common/buffer-view.hpp>
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/fcntl.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/consumer/metadata-bucket.hpp>
+#include <common/index/index.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/optional.hpp>
+#include <common/pipe.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/utils.hpp>
+
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
+#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/socket.h>
+#include <sys/stat.h>
#include <sys/types.h>
-#include <inttypes.h>
#include <unistd.h>
-#include <sys/stat.h>
-#include <stdint.h>
-
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/compat/fcntl.h>
-#include <common/compat/endian.h>
-#include <common/pipe.h>
-#include <common/relayd/relayd.h>
-#include <common/utils.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/index/index.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/optional.h>
-#include <common/buffer-view.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/metadata-bucket.h>
-
-#include "kernel-consumer.h"
extern struct lttng_consumer_global_data the_consumer_data;
extern int consumer_poll_timeout;
*
* Returns 0 on success, < 0 on error.
*/
-int lttng_kconsumer_sample_snapshot_positions(
- struct lttng_consumer_stream *stream)
+int lttng_kconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
{
LTTNG_ASSERT(stream);
*
* Returns 0 on success, < 0 on error
*/
-int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
- unsigned long *pos)
+int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
{
int ret;
int infd = stream->wait_fd;
*
* Returns 0 on success, < 0 on error
*/
-int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
- unsigned long *pos)
+int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
{
int ret;
int infd = stream->wait_fd;
return ret;
}
-static
-int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
- const char **addr)
+static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr)
{
int ret;
unsigned long mmap_offset;
*
* Returns 0 on success, < 0 on error
*/
-static int lttng_kconsumer_snapshot_channel(
- struct lttng_consumer_channel *channel,
- uint64_t key, char *path, uint64_t relayd_id,
- uint64_t nb_packets_per_stream,
- struct lttng_consumer_local_data *ctx)
+static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *channel,
+ uint64_t key,
+ char *path,
+ uint64_t relayd_id,
+ uint64_t nb_packets_per_stream)
{
int ret;
struct lttng_consumer_stream *stream;
/* Splice is not supported yet for channel snapshot. */
if (channel->output != CONSUMER_CHANNEL_MMAP) {
ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
- channel->name);
+ channel->name);
ret = -1;
goto end;
}
- cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
unsigned long consumed_pos, produced_pos;
health_code_update();
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
ERR("sending stream to relayd");
- goto end_unlock;
+ goto error_close_stream_output;
}
} else {
- ret = consumer_stream_create_output_files(stream,
- false);
+ ret = consumer_stream_create_output_files(stream, false);
if (ret < 0) {
- goto end_unlock;
+ goto error_close_stream_output;
}
- DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
- stream->key);
+ DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key);
}
ret = kernctl_buffer_flush_empty(stream->wait_fd);
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
- goto end_unlock;
+ goto error_close_stream_output;
}
goto end_unlock;
}
ret = lttng_kconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking kernel snapshot");
- goto end_unlock;
+ goto error_close_stream_output;
}
ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced kernel snapshot position");
- goto end_unlock;
+ goto error_close_stream_output;
}
ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd kernel snapshot position");
- goto end_unlock;
+ goto error_close_stream_output;
}
- consumed_pos = consumer_get_consume_start_pos(consumed_pos,
- produced_pos, nb_packets_per_stream,
- stream->max_sb_size);
+ consumed_pos = consumer_get_consume_start_pos(
+ consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size);
while ((long) (consumed_pos - produced_pos) < 0) {
ssize_t read_len;
if (ret < 0) {
if (ret != -EAGAIN) {
PERROR("kernctl_get_subbuf snapshot");
- goto end_unlock;
+ goto error_close_stream_output;
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
goto error_put_subbuf;
}
- subbuf_view = lttng_buffer_view_init(
- subbuf_addr, 0, padded_len);
+ subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
read_len = lttng_consumer_on_read_subbuffer_mmap(
- stream, &subbuf_view,
- padded_len - len);
+ stream, &subbuf_view, padded_len - len);
/*
* We write the padded len in local tracefiles but the data len
* when using a relay. Display the error but continue processing
if (relayd_id != (uint64_t) -1ULL) {
if (read_len != len) {
ERR("Error sending to the relay (ret: %zd != len: %lu)",
- read_len, len);
+ read_len,
+ len);
}
} else {
if (read_len != padded_len) {
ERR("Error writing to tracefile (ret: %zd != len: %lu)",
- read_len, padded_len);
+ read_len,
+ padded_len);
}
}
ret = kernctl_put_subbuf(stream->wait_fd);
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf");
- goto end_unlock;
+ goto error_close_stream_output;
}
consumed_pos += stream->max_sb_size;
}
- if (relayd_id == (uint64_t) -1ULL) {
- if (stream->out_fd >= 0) {
- ret = close(stream->out_fd);
- if (ret < 0) {
- PERROR("Kernel consumer snapshot close out_fd");
- goto end_unlock;
- }
- stream->out_fd = -1;
- }
- } else {
- close_relayd_stream(stream);
- stream->net_seq_idx = (uint64_t) -1ULL;
- }
- lttng_trace_chunk_put(stream->trace_chunk);
- stream->trace_chunk = NULL;
+ consumer_stream_close_output(stream);
pthread_mutex_unlock(&stream->lock);
}
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf error path");
}
+error_close_stream_output:
+ consumer_stream_close_output(stream);
end_unlock:
pthread_mutex_unlock(&stream->lock);
end:
*
* Returns 0 on success, < 0 on error
*/
-static int lttng_kconsumer_snapshot_metadata(
- struct lttng_consumer_channel *metadata_channel,
- uint64_t key, char *path, uint64_t relayd_id,
- struct lttng_consumer_local_data *ctx)
+static int lttng_kconsumer_snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
+ uint64_t key,
+ char *path,
+ uint64_t relayd_id,
+ struct lttng_consumer_local_data *ctx)
{
int ret, use_relayd = 0;
ssize_t ret_read;
LTTNG_ASSERT(ctx);
- DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
- key, path);
+ DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
rcu_read_lock();
goto error_snapshot;
}
} else {
- ret = consumer_stream_create_output_files(metadata_stream,
- false);
+ ret = consumer_stream_create_output_files(metadata_stream, false);
if (ret < 0) {
goto error_snapshot;
}
ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
if (ret_read < 0) {
- ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
- ret_read);
+ ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", ret_read);
ret = ret_read;
goto error_snapshot;
}
* Return 1 on success else a negative value or 0.
*/
int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
- int sock, struct pollfd *consumer_sockpoll)
+ int sock,
+ struct pollfd *consumer_sockpoll)
{
int ret_func;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret_recv != sizeof(msg)) {
if (ret_recv > 0) {
- lttng_consumer_send_error(ctx,
- LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
ret_recv = -1;
}
return ret_recv;
{
uint32_t major = msg.u.relayd_sock.major;
uint32_t minor = msg.u.relayd_sock.minor;
- enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto)
- msg.u.relayd_sock.relayd_socket_protocol;
+ enum lttcomm_sock_proto protocol =
+ (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
/* Session daemon status message are handled in the following call. */
consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
- msg.u.relayd_sock.type, ctx, sock,
- consumer_sockpoll, msg.u.relayd_sock.session_id,
- msg.u.relayd_sock.relayd_session_id, major,
- minor, protocol);
+ msg.u.relayd_sock.type,
+ ctx,
+ sock,
+ consumer_sockpoll,
+ msg.u.relayd_sock.session_id,
+ msg.u.relayd_sock.relayd_session_id,
+ major,
+ minor,
+ protocol);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
health_code_update();
DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
- new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
- msg.u.channel.session_id,
- msg.u.channel.chunk_id.is_set ?
- &chunk_id : NULL,
- msg.u.channel.pathname,
- msg.u.channel.name,
- msg.u.channel.relayd_id, msg.u.channel.output,
- msg.u.channel.tracefile_size,
- msg.u.channel.tracefile_count, 0,
- msg.u.channel.monitor,
- msg.u.channel.live_timer_interval,
- msg.u.channel.is_live,
- NULL, NULL);
+ new_channel =
+ consumer_allocate_channel(msg.u.channel.channel_key,
+ msg.u.channel.session_id,
+ msg.u.channel.chunk_id.is_set ? &chunk_id : NULL,
+ msg.u.channel.pathname,
+ msg.u.channel.name,
+ msg.u.channel.relayd_id,
+ msg.u.channel.output,
+ msg.u.channel.tracefile_size,
+ msg.u.channel.tracefile_count,
+ 0,
+ msg.u.channel.monitor,
+ msg.u.channel.live_timer_interval,
+ msg.u.channel.is_live,
+ NULL,
+ NULL);
if (new_channel == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
health_code_update();
if (ctx->on_recv_channel != NULL) {
- int ret_recv_channel =
- ctx->on_recv_channel(new_channel);
+ int ret_recv_channel = ctx->on_recv_channel(new_channel);
if (ret_recv_channel == 0) {
- ret_add_channel = consumer_add_channel(
- new_channel, ctx);
+ ret_add_channel = consumer_add_channel(new_channel, ctx);
} else if (ret_recv_channel < 0) {
goto end_nosignal;
}
} else {
- ret_add_channel =
- consumer_add_channel(new_channel, ctx);
+ ret_add_channel = consumer_add_channel(new_channel, ctx);
}
- if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
- !ret_add_channel) {
+ if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret_add_channel) {
int monitor_start_ret;
DBG("Consumer starting monitor timer");
- consumer_timer_live_start(new_channel,
- msg.u.channel.live_timer_interval);
+ consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval);
monitor_start_ret = consumer_timer_monitor_start(
- new_channel,
- msg.u.channel.monitor_timer_interval);
+ new_channel, msg.u.channel.monitor_timer_interval);
if (monitor_start_ret < 0) {
ERR("Starting channel monitoring timer failed");
goto end_nosignal;
/* If we received an error in add_channel, we need to report it. */
if (ret_add_channel < 0) {
- ret_send_status = consumer_send_status_msg(
- sock, ret_add_channel);
+ ret_send_status = consumer_send_status_msg(sock, ret_add_channel);
if (ret_send_status < 0) {
goto error_fatal;
}
health_code_update();
pthread_mutex_lock(&channel->lock);
- new_stream = consumer_stream_create(
- channel,
- channel->key,
- fd,
- channel->name,
- channel->relayd_id,
- channel->session_id,
- channel->trace_chunk,
- msg.u.stream.cpu,
- &alloc_ret,
- channel->type,
- channel->monitor);
+ new_stream = consumer_stream_create(channel,
+ channel->key,
+ fd,
+ channel->name,
+ channel->relayd_id,
+ channel->session_id,
+ channel->trace_chunk,
+ msg.u.stream.cpu,
+ &alloc_ret,
+ channel->type,
+ channel->monitor);
if (new_stream == NULL) {
switch (alloc_ret) {
case -ENOMEM:
}
new_stream->wait_fd = fd;
- ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
- new_stream->wait_fd, &new_stream->max_sb_size);
+ ret_get_max_subbuf_size =
+ kernctl_get_max_subbuf_size(new_stream->wait_fd, &new_stream->max_sb_size);
if (ret_get_max_subbuf_size < 0) {
pthread_mutex_unlock(&channel->lock);
ERR("Failed to get kernel maximal subbuffer size");
goto error_add_stream_nosignal;
}
- consumer_stream_update_channel_attributes(new_stream,
- channel);
+ consumer_stream_update_channel_attributes(new_stream, channel);
/*
* We've just assigned the channel to the stream so increment the
/* Do not monitor this stream. */
if (!channel->monitor) {
DBG("Kernel consumer add stream %s in no monitor mode with "
- "relayd id %" PRIu64, new_stream->name,
- new_stream->net_seq_idx);
+ "relayd id %" PRIu64,
+ new_stream->name,
+ new_stream->net_seq_idx);
cds_list_add(&new_stream->send_node, &channel->streams.head);
pthread_mutex_unlock(&new_stream->lock);
pthread_mutex_unlock(&channel->lock);
if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
int ret_send_relayd_stream;
- ret_send_relayd_stream = consumer_send_relayd_stream(
- new_stream, new_stream->chan->pathname);
+ ret_send_relayd_stream =
+ consumer_send_relayd_stream(new_stream, new_stream->chan->pathname);
if (ret_send_relayd_stream < 0) {
pthread_mutex_unlock(&new_stream->lock);
pthread_mutex_unlock(&channel->lock);
int ret_send_relayd_streams_sent;
ret_send_relayd_streams_sent =
- consumer_send_relayd_streams_sent(
- new_stream->net_seq_idx);
+ consumer_send_relayd_streams_sent(new_stream->net_seq_idx);
if (ret_send_relayd_streams_sent < 0) {
pthread_mutex_unlock(&new_stream->lock);
pthread_mutex_unlock(&channel->lock);
health_code_update();
- ret_pipe_write = lttng_pipe_write(
- stream_pipe, &new_stream, sizeof(new_stream));
+ ret_pipe_write = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
if (ret_pipe_write < 0) {
ERR("Consumer write %s stream to pipe %d",
- new_stream->metadata_flag ? "metadata" : "data",
- lttng_pipe_get_writefd(stream_pipe));
+ new_stream->metadata_flag ? "metadata" : "data",
+ lttng_pipe_get_writefd(stream_pipe));
if (new_stream->metadata_flag) {
consumer_del_stream_for_metadata(new_stream);
} else {
}
DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
- new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
-end_add_stream:
+ new_stream->name,
+ fd,
+ new_stream->chan->pathname,
+ new_stream->relayd_stream_id);
+ end_add_stream:
break;
-error_add_stream_nosignal:
+ error_add_stream_nosignal:
goto end_nosignal;
-error_add_stream_fatal:
+ error_add_stream_fatal:
goto error_fatal;
}
case LTTNG_CONSUMER_STREAMS_SENT:
* We could not find the channel. Can happen if cpu hotplug
* happens while tearing down.
*/
- ERR("Unable to find channel key %" PRIu64,
- msg.u.sent_streams.channel_key);
+ ERR("Unable to find channel key %" PRIu64, msg.u.sent_streams.channel_key);
ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
}
* Send status code to session daemon.
*/
ret_send_status = consumer_send_status_msg(sock, ret_code);
- if (ret_send_status < 0 ||
- ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
+ if (ret_send_status < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
/* Somehow, the session daemon is not responding anymore. */
goto error_streams_sent_nosignal;
}
if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
int ret_send_relay_streams;
- ret_send_relay_streams = consumer_send_relayd_streams_sent(
- msg.u.sent_streams.net_seq_idx);
+ ret_send_relay_streams =
+ consumer_send_relayd_streams_sent(msg.u.sent_streams.net_seq_idx);
if (ret_send_relay_streams < 0) {
goto error_streams_sent_nosignal;
}
channel->streams_sent_to_relayd = true;
}
-end_error_streams_sent:
+ end_error_streams_sent:
break;
-error_streams_sent_nosignal:
+ error_streams_sent_nosignal:
goto end_nosignal;
}
case LTTNG_CONSUMER_UPDATE_STREAM:
health_code_update();
/* Send back returned value to session daemon */
- ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
- sizeof(ret_data_pending));
+ ret_send =
+ lttcomm_send_unix_sock(sock, &ret_data_pending, sizeof(ret_data_pending));
if (ret_send < 0) {
PERROR("send data pending ret code");
goto error_fatal;
int ret_snapshot;
ret_snapshot = lttng_kconsumer_snapshot_metadata(
- channel, key,
- msg.u.snapshot_channel.pathname,
- msg.u.snapshot_channel.relayd_id,
- ctx);
+ channel,
+ key,
+ msg.u.snapshot_channel.pathname,
+ msg.u.snapshot_channel.relayd_id,
+ ctx);
if (ret_snapshot < 0) {
ERR("Snapshot metadata failed");
ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
int ret_snapshot;
ret_snapshot = lttng_kconsumer_snapshot_channel(
- channel, key,
- msg.u.snapshot_channel.pathname,
- msg.u.snapshot_channel.relayd_id,
- msg.u.snapshot_channel
- .nb_packets_per_stream,
- ctx);
+ channel,
+ key,
+ msg.u.snapshot_channel.pathname,
+ msg.u.snapshot_channel.relayd_id,
+ msg.u.snapshot_channel.nb_packets_per_stream);
if (ret_snapshot < 0) {
ERR("Snapshot channel failed");
ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
consumer_del_channel(channel);
-end_destroy_channel:
+ end_destroy_channel:
goto end_nosignal;
}
case LTTNG_CONSUMER_DISCARDED_EVENTS:
uint64_t id = msg.u.discarded_events.session_id;
uint64_t key = msg.u.discarded_events.channel_key;
- DBG("Kernel consumer discarded events command for session id %"
- PRIu64 ", channel key %" PRIu64, id, key);
+ DBG("Kernel consumer discarded events command for session id %" PRIu64
+ ", channel key %" PRIu64,
+ id,
+ key);
channel = consumer_find_channel(key);
if (!channel) {
- ERR("Kernel consumer discarded events channel %"
- PRIu64 " not found", key);
+ ERR("Kernel consumer discarded events channel %" PRIu64 " not found", key);
count = 0;
} else {
count = channel->discarded_events;
uint64_t id = msg.u.lost_packets.session_id;
uint64_t key = msg.u.lost_packets.channel_key;
- DBG("Kernel consumer lost packets command for session id %"
- PRIu64 ", channel key %" PRIu64, id, key);
+ DBG("Kernel consumer lost packets command for session id %" PRIu64
+ ", channel key %" PRIu64,
+ id,
+ key);
channel = consumer_find_channel(key);
if (!channel) {
- ERR("Kernel consumer lost packets channel %"
- PRIu64 " not found", key);
+ ERR("Kernel consumer lost packets channel %" PRIu64 " not found", key);
count = 0;
} else {
count = channel->lost_packets;
goto error_fatal;
}
- ret_recv = lttcomm_recv_fds_unix_sock(
- sock, &channel_monitor_pipe, 1);
+ ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1);
if (ret_recv != sizeof(channel_monitor_pipe)) {
ERR("Failed to receive channel monitor pipe");
goto error_fatal;
DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
ret_set_channel_monitor_pipe =
- consumer_timer_thread_set_channel_monitor_pipe(
- channel_monitor_pipe);
+ consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe);
if (!ret_set_channel_monitor_pipe) {
int flags;
int ret_fcntl;
}
flags = ret_fcntl;
- ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
- flags | O_NONBLOCK);
+ ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK);
if (ret_fcntl == -1) {
PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
goto error_fatal;
int ret_rotate_channel;
ret_rotate_channel = lttng_consumer_rotate_channel(
- channel, key,
- msg.u.rotate_channel.relayd_id,
- msg.u.rotate_channel.metadata, ctx);
+ channel, key, msg.u.rotate_channel.relayd_id);
if (ret_rotate_channel < 0) {
ERR("Rotate channel failed");
ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
/* Rotate the streams that are ready right now. */
int ret_rotate;
- ret_rotate = lttng_consumer_rotate_ready_streams(
- channel, key, ctx);
+ ret_rotate = lttng_consumer_rotate_ready_streams(channel, key);
if (ret_rotate < 0) {
ERR("Rotate ready streams failed");
}
}
break;
-error_rotate_channel:
+ error_rotate_channel:
goto end_nosignal;
}
case LTTNG_CONSUMER_CLEAR_CHANNEL:
} else {
int ret_clear_channel;
- ret_clear_channel =
- lttng_consumer_clear_channel(channel);
+ ret_clear_channel = lttng_consumer_clear_channel(channel);
if (ret_clear_channel) {
ERR("Clear channel failed");
ret_code = (lttcomm_return_code) ret_clear_channel;
case LTTNG_CONSUMER_INIT:
{
int ret_send_status;
+ lttng_uuid sessiond_uuid;
- ret_code = lttng_consumer_init_command(ctx,
- msg.u.init.sessiond_uuid);
+ std::copy(std::begin(msg.u.init.sessiond_uuid),
+ std::end(msg.u.init.sessiond_uuid),
+ sessiond_uuid.begin());
+
+ ret_code = lttng_consumer_init_command(ctx, sessiond_uuid);
health_code_update();
ret_send_status = consumer_send_status_msg(sock, ret_code);
if (ret_send_status < 0) {
case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
{
const struct lttng_credentials credentials = {
- .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
- .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
+ .uid = LTTNG_OPTIONAL_INIT_VALUE(
+ msg.u.create_trace_chunk.credentials.value.uid),
+ .gid = LTTNG_OPTIONAL_INIT_VALUE(
+ msg.u.create_trace_chunk.credentials.value.gid),
};
- const bool is_local_trace =
- !msg.u.create_trace_chunk.relayd_id.is_set;
- const uint64_t relayd_id =
- msg.u.create_trace_chunk.relayd_id.value;
- const char *chunk_override_name =
- *msg.u.create_trace_chunk.override_name ?
- msg.u.create_trace_chunk.override_name :
- NULL;
+ const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set;
+ const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value;
+ const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ?
+ msg.u.create_trace_chunk.override_name :
+ NULL;
struct lttng_directory_handle *chunk_directory_handle = NULL;
/*
ssize_t ret_recv;
/* Acnowledge the reception of the command. */
- ret_send_status = consumer_send_status_msg(
- sock, LTTCOMM_CONSUMERD_SUCCESS);
+ ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
- ret_recv = lttcomm_recv_fds_unix_sock(
- sock, &chunk_dirfd, 1);
+ ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
if (ret_recv != sizeof(chunk_dirfd)) {
ERR("Failed to receive trace chunk directory file descriptor");
goto error_fatal;
}
- DBG("Received trace chunk directory fd (%d)",
- chunk_dirfd);
- chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
- chunk_dirfd);
+ DBG("Received trace chunk directory fd (%d)", chunk_dirfd);
+ chunk_directory_handle =
+ lttng_directory_handle_create_from_dirfd(chunk_dirfd);
if (!chunk_directory_handle) {
ERR("Failed to initialize chunk directory handle from directory file descriptor");
if (close(chunk_dirfd)) {
}
ret_code = lttng_consumer_create_trace_chunk(
- !is_local_trace ? &relayd_id : NULL,
- msg.u.create_trace_chunk.session_id,
- msg.u.create_trace_chunk.chunk_id,
- (time_t) msg.u.create_trace_chunk
- .creation_timestamp,
- chunk_override_name,
- msg.u.create_trace_chunk.credentials.is_set ?
- &credentials :
- NULL,
- chunk_directory_handle);
+ !is_local_trace ? &relayd_id : NULL,
+ msg.u.create_trace_chunk.session_id,
+ msg.u.create_trace_chunk.chunk_id,
+ (time_t) msg.u.create_trace_chunk.creation_timestamp,
+ chunk_override_name,
+ msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL,
+ chunk_directory_handle);
lttng_directory_handle_put(chunk_directory_handle);
goto end_msg_sessiond;
}
case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
{
enum lttng_trace_chunk_command_type close_command =
- (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
- const uint64_t relayd_id =
- msg.u.close_trace_chunk.relayd_id.value;
+ (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
+ const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value;
struct lttcomm_consumer_close_trace_chunk_reply reply;
char path[LTTNG_PATH_MAX];
ssize_t ret_send;
ret_code = lttng_consumer_close_trace_chunk(
- msg.u.close_trace_chunk.relayd_id.is_set ?
- &relayd_id :
- NULL,
- msg.u.close_trace_chunk.session_id,
- msg.u.close_trace_chunk.chunk_id,
- (time_t) msg.u.close_trace_chunk.close_timestamp,
- msg.u.close_trace_chunk.close_command.is_set ?
- &close_command :
- NULL, path);
+ msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : NULL,
+ msg.u.close_trace_chunk.session_id,
+ msg.u.close_trace_chunk.chunk_id,
+ (time_t) msg.u.close_trace_chunk.close_timestamp,
+ msg.u.close_trace_chunk.close_command.is_set ? &close_command : NULL,
+ path);
reply.ret_code = ret_code;
reply.path_length = strlen(path) + 1;
ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
if (ret_send != sizeof(reply)) {
goto error_fatal;
}
- ret_send = lttcomm_send_unix_sock(
- sock, path, reply.path_length);
+ ret_send = lttcomm_send_unix_sock(sock, path, reply.path_length);
if (ret_send != reply.path_length) {
goto error_fatal;
}
}
case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
{
- const uint64_t relayd_id =
- msg.u.trace_chunk_exists.relayd_id.value;
+ const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value;
ret_code = lttng_consumer_trace_chunk_exists(
- msg.u.trace_chunk_exists.relayd_id.is_set ?
- &relayd_id : NULL,
- msg.u.trace_chunk_exists.session_id,
- msg.u.trace_chunk_exists.chunk_id);
+ msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : NULL,
+ msg.u.trace_chunk_exists.session_id,
+ msg.u.trace_chunk_exists.chunk_id);
goto end_msg_sessiond;
}
case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
{
const uint64_t key = msg.u.open_channel_packets.key;
- struct lttng_consumer_channel *channel =
- consumer_find_channel(key);
+ struct lttng_consumer_channel *channel = consumer_find_channel(key);
if (channel) {
pthread_mutex_lock(&channel->lock);
*
* Metadata stream lock MUST be acquired.
*/
-enum sync_metadata_status lttng_kconsumer_sync_metadata(
- struct lttng_consumer_stream *metadata)
+enum sync_metadata_status lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
{
int ret;
enum sync_metadata_status status;
return status;
}
-static
-int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuf)
+static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuf)
{
int ret;
- ret = kernctl_get_subbuf_size(
- stream->wait_fd, &subbuf->info.data.subbuf_size);
+ ret = kernctl_get_subbuf_size(stream->wait_fd, &subbuf->info.data.subbuf_size);
if (ret) {
goto end;
}
- ret = kernctl_get_padded_subbuf_size(
- stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
+ ret = kernctl_get_padded_subbuf_size(stream->wait_fd,
+ &subbuf->info.data.padded_subbuf_size);
if (ret) {
goto end;
}
return ret;
}
-static
-int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuf)
+static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuf)
{
int ret;
goto end;
}
- ret = kernctl_get_metadata_version(
- stream->wait_fd, &subbuf->info.metadata.version);
+ ret = kernctl_get_metadata_version(stream->wait_fd, &subbuf->info.metadata.version);
if (ret) {
goto end;
}
return ret;
}
-static
-int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuf)
+static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuf)
{
int ret;
goto end;
}
- ret = kernctl_get_packet_size(
- stream->wait_fd, &subbuf->info.data.packet_size);
+ ret = kernctl_get_packet_size(stream->wait_fd, &subbuf->info.data.packet_size);
if (ret < 0) {
PERROR("Failed to get sub-buffer packet size");
goto end;
}
- ret = kernctl_get_content_size(
- stream->wait_fd, &subbuf->info.data.content_size);
+ ret = kernctl_get_content_size(stream->wait_fd, &subbuf->info.data.content_size);
if (ret < 0) {
PERROR("Failed to get sub-buffer content size");
goto end;
}
- ret = kernctl_get_timestamp_begin(
- stream->wait_fd, &subbuf->info.data.timestamp_begin);
+ ret = kernctl_get_timestamp_begin(stream->wait_fd, &subbuf->info.data.timestamp_begin);
if (ret < 0) {
PERROR("Failed to get sub-buffer begin timestamp");
goto end;
}
- ret = kernctl_get_timestamp_end(
- stream->wait_fd, &subbuf->info.data.timestamp_end);
+ ret = kernctl_get_timestamp_end(stream->wait_fd, &subbuf->info.data.timestamp_end);
if (ret < 0) {
PERROR("Failed to get sub-buffer end timestamp");
goto end;
}
- ret = kernctl_get_events_discarded(
- stream->wait_fd, &subbuf->info.data.events_discarded);
+ ret = kernctl_get_events_discarded(stream->wait_fd, &subbuf->info.data.events_discarded);
if (ret) {
PERROR("Failed to get sub-buffer events discarded count");
goto end;
}
ret = kernctl_get_sequence_number(stream->wait_fd,
- &subbuf->info.data.sequence_number.value);
+ &subbuf->info.data.sequence_number.value);
if (ret) {
/* May not be supported by older LTTng-modules. */
if (ret != -ENOTTY) {
subbuf->info.data.sequence_number.is_set = true;
}
- ret = kernctl_get_stream_id(
- stream->wait_fd, &subbuf->info.data.stream_id);
+ ret = kernctl_get_stream_id(stream->wait_fd, &subbuf->info.data.stream_id);
if (ret < 0) {
PERROR("Failed to get stream id");
goto end;
}
- ret = kernctl_get_instance_id(stream->wait_fd,
- &subbuf->info.data.stream_instance_id.value);
+ ret = kernctl_get_instance_id(stream->wait_fd, &subbuf->info.data.stream_instance_id.value);
if (ret) {
/* May not be supported by older LTTng-modules. */
if (ret != -ENOTTY) {
return ret;
}
-static
-enum get_next_subbuffer_status get_subbuffer_common(
- struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+static enum get_next_subbuffer_status get_subbuffer_common(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
{
int ret;
enum get_next_subbuffer_status status;
status = GET_NEXT_SUBBUFFER_STATUS_OK;
break;
case -ENODATA:
- case -EAGAIN:
+ case -EAGAIN:
/*
* The caller only expects -ENODATA when there is no data to
* read, but the kernel tracer returns -EAGAIN when there is
goto end;
}
- ret = stream->read_subbuffer_ops.extract_subbuffer_info(
- stream, subbuffer);
+ ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer);
if (ret) {
status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
}
return status;
}
-static
-enum get_next_subbuffer_status get_next_subbuffer_splice(
- struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+static enum get_next_subbuffer_status
+get_next_subbuffer_splice(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
{
- const enum get_next_subbuffer_status status =
- get_subbuffer_common(stream, subbuffer);
+ const enum get_next_subbuffer_status status = get_subbuffer_common(stream, subbuffer);
if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
goto end;
return status;
}
-static
-enum get_next_subbuffer_status get_next_subbuffer_mmap(
- struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+static enum get_next_subbuffer_status get_next_subbuffer_mmap(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
{
int ret;
enum get_next_subbuffer_status status;
goto end;
}
- subbuffer->buffer.buffer = lttng_buffer_view_init(
- addr, 0, subbuffer->info.data.padded_subbuf_size);
+ subbuffer->buffer.buffer =
+ lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
end:
return status;
}
-static
-enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+static enum get_next_subbuffer_status
+get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
{
int ret;
const char *addr;
bool coherent;
enum get_next_subbuffer_status status;
- ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
- &coherent);
+ ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, &coherent);
if (ret) {
goto end;
}
- ret = stream->read_subbuffer_ops.extract_subbuffer_info(
- stream, subbuffer);
+ ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer);
if (ret) {
goto end;
}
goto end;
}
- subbuffer->buffer.buffer = lttng_buffer_view_init(
- addr, 0, subbuffer->info.data.padded_subbuf_size);
+ subbuffer->buffer.buffer =
+ lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
- subbuffer->info.metadata.padded_subbuf_size,
- coherent ? "true" : "false");
+ subbuffer->info.metadata.padded_subbuf_size,
+ coherent ? "true" : "false");
end:
/*
* The caller only expects -ENODATA when there is no data to read, but
return status;
}
-static
-int put_next_subbuffer(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+static int put_next_subbuffer(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer __attribute__((unused)))
{
const int ret = kernctl_put_next_subbuf(stream->wait_fd);
return ret;
}
-static
-bool is_get_next_check_metadata_available(int tracer_fd)
+static bool is_get_next_check_metadata_available(int tracer_fd)
{
const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
const bool available = ret != -ENOTTY;
return available;
}
-static
-int signal_metadata(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+static int signal_metadata(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx __attribute__((unused)))
{
ASSERT_LOCKED(stream->metadata_rdv_lock);
return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
}
-static
-int lttng_kconsumer_set_stream_ops(
- struct lttng_consumer_stream *stream)
+static int lttng_kconsumer_set_stream_ops(struct lttng_consumer_stream *stream)
{
int ret = 0;
if (is_get_next_check_metadata_available(stream->wait_fd)) {
DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
stream->read_subbuffer_ops.get_next_subbuffer =
- get_next_subbuffer_metadata_check;
- ret = consumer_stream_enable_metadata_bucketization(
- stream);
+ get_next_subbuffer_metadata_check;
+ ret = consumer_stream_enable_metadata_bucketization(stream);
if (ret) {
goto end;
}
if (!stream->read_subbuffer_ops.get_next_subbuffer) {
if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
- stream->read_subbuffer_ops.get_next_subbuffer =
- get_next_subbuffer_mmap;
+ stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_mmap;
} else {
- stream->read_subbuffer_ops.get_next_subbuffer =
- get_next_subbuffer_splice;
+ stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_splice;
}
}
if (stream->metadata_flag) {
- stream->read_subbuffer_ops.extract_subbuffer_info =
- extract_metadata_subbuffer_info;
+ stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info;
} else {
- stream->read_subbuffer_ops.extract_subbuffer_info =
- extract_data_subbuffer_info;
+ stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info;
if (stream->chan->is_live) {
- stream->read_subbuffer_ops.send_live_beacon =
- consumer_flush_kernel_index;
+ stream->read_subbuffer_ops.send_live_beacon = consumer_flush_kernel_index;
}
}
* no current trace chunk on the parent channel.
*/
if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
- stream->chan->trace_chunk) {
+ stream->chan->trace_chunk) {
ret = consumer_stream_create_output_files(stream, true);
if (ret) {
goto error;
}
stream->mmap_len = (size_t) mmap_len;
- stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
- MAP_PRIVATE, stream->wait_fd, 0);
+ stream->mmap_base =
+ mmap(NULL, stream->mmap_len, PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
if (stream->mmap_base == MAP_FAILED) {
PERROR("Error mmaping");
ret = -1;
/* There is still data so let's put back this subbuffer. */
ret = kernctl_put_subbuf(stream->wait_fd);
LTTNG_ASSERT(ret == 0);
- ret = 1; /* Data is pending */
+ ret = 1; /* Data is pending */
goto end;
}