*/
#define _LGPL_SOURCE
+#include "ust-consumer.hpp"
+
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/fcntl.hpp>
+#include <common/consumer/consumer-metadata-cache.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/index/index.hpp>
+#include <common/optional.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/shm.hpp>
+#include <common/utils.hpp>
+
#include <lttng/ust-ctl.h>
#include <lttng/ust-sigbus.h>
+
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
-#include <inttypes.h>
#include <unistd.h>
#include <urcu/list.h>
-#include <signal.h>
-#include <stdbool.h>
-#include <stdint.h>
-#include <bin/lttng-consumerd/health-consumerd.hpp>
-#include <common/common.hpp>
-#include <common/sessiond-comm/sessiond-comm.hpp>
-#include <common/relayd/relayd.hpp>
-#include <common/compat/fcntl.hpp>
-#include <common/compat/endian.hpp>
-#include <common/consumer/consumer-metadata-cache.hpp>
-#include <common/consumer/consumer-stream.hpp>
-#include <common/consumer/consumer-timer.hpp>
-#include <common/utils.hpp>
-#include <common/index/index.hpp>
-#include <common/consumer/consumer.hpp>
-#include <common/shm.hpp>
-#include <common/optional.hpp>
-
-#include "ust-consumer.hpp"
-
-#define INT_MAX_STR_LEN 12 /* includes \0 */
+#define INT_MAX_STR_LEN 12 /* includes \0 */
extern struct lttng_consumer_global_data the_consumer_data;
extern int consumer_poll_timeout;
* Returns 0 on success or else a negative value.
*/
static int add_channel(struct lttng_consumer_channel *channel,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx)
{
int ret = 0;
*
* Return NULL on error else the newly allocated stream object.
*/
-static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
- struct lttng_consumer_channel *channel,
- struct lttng_consumer_local_data *ctx, int *_alloc_ret)
+static struct lttng_consumer_stream *allocate_stream(int cpu,
+ int key,
+ struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx,
+ int *_alloc_ret)
{
int alloc_ret;
struct lttng_consumer_stream *stream = NULL;
LTTNG_ASSERT(channel);
LTTNG_ASSERT(ctx);
- stream = consumer_stream_create(
- channel,
- channel->key,
- key,
- channel->name,
- channel->relayd_id,
- channel->session_id,
- channel->trace_chunk,
- cpu,
- &alloc_ret,
- channel->type,
- channel->monitor);
+ stream = consumer_stream_create(channel,
+ channel->key,
+ key,
+ channel->name,
+ channel->relayd_id,
+ channel->session_id,
+ channel->trace_chunk,
+ cpu,
+ &alloc_ret,
+ channel->type,
+ channel->monitor);
if (stream == NULL) {
switch (alloc_ret) {
case -ENOENT:
* Returns 0 on success else a negative value.
*/
static int send_stream_to_thread(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx)
{
int ret;
struct lttng_pipe *stream_pipe;
ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
- stream->metadata_flag ? "metadata" : "data",
- lttng_pipe_get_writefd(stream_pipe));
+ stream->metadata_flag ? "metadata" : "data",
+ lttng_pipe_get_writefd(stream_pipe));
if (stream->metadata_flag) {
consumer_del_stream_for_metadata(stream);
} else {
return ret;
}
-static
-int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu)
+static int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu)
{
- char cpu_nr[INT_MAX_STR_LEN]; /* int max len */
+ char cpu_nr[INT_MAX_STR_LEN]; /* int max len */
int ret;
strncpy(stream_shm_path, shm_path, PATH_MAX);
PERROR("snprintf");
goto end;
}
- strncat(stream_shm_path, cpu_nr,
- PATH_MAX - strlen(stream_shm_path) - 1);
+ strncat(stream_shm_path, cpu_nr, PATH_MAX - strlen(stream_shm_path) - 1);
ret = 0;
end:
return ret;
* Return 0 on success else a negative value.
*/
static int create_ust_streams(struct lttng_consumer_channel *channel,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx)
{
int ret, cpu = 0;
struct lttng_ust_ctl_consumer_stream *ustream;
*/
cds_list_add_tail(&stream->send_node, &channel->streams.head);
- ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream,
- &stream->max_sb_size);
+ ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream, &stream->max_sb_size);
if (ret < 0) {
- ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s",
- stream->name);
+ ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s", stream->name);
goto error;
}
}
DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
- stream->name, stream->key, stream->relayd_stream_id);
+ stream->name,
+ stream->key,
+ stream->relayd_stream_id);
/* Set next CPU stream. */
channel->streams.count = ++cpu;
if (channel->monitor) {
/* Set metadata poll pipe if we created one */
memcpy(stream->ust_metadata_poll_pipe,
- ust_metadata_pipe,
- sizeof(ust_metadata_pipe));
+ ust_metadata_pipe,
+ sizeof(ust_metadata_pipe));
}
}
pthread_mutex_unlock(&stream->lock);
return ret;
}
-static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu,
- const struct lttng_credentials *session_credentials)
+static int open_ust_stream_fd(struct lttng_consumer_channel *channel,
+ int cpu,
+ const struct lttng_credentials *session_credentials)
{
char shm_path[PATH_MAX];
int ret;
goto error_shm_path;
}
return run_as_open(shm_path,
- O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR,
- lttng_credentials_get_uid(session_credentials),
- lttng_credentials_get_gid(session_credentials));
+ O_RDWR | O_CREAT | O_EXCL,
+ S_IRUSR | S_IWUSR,
+ lttng_credentials_get_uid(session_credentials),
+ lttng_credentials_get_gid(session_credentials));
error_shm_path:
return -1;
* Return 0 on success or else a negative value.
*/
static int create_ust_channel(struct lttng_consumer_channel *channel,
- struct lttng_ust_ctl_consumer_channel_attr *attr,
- struct lttng_ust_ctl_consumer_channel **ust_chanp)
+ struct lttng_ust_ctl_consumer_channel_attr *attr,
+ struct lttng_ust_ctl_consumer_channel **ust_chanp)
{
int ret, nr_stream_fds, i, j;
int *stream_fds;
LTTNG_ASSERT(channel->buffer_credentials.is_set);
DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
- "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
- "switch_timer_interval: %u, read_timer_interval: %u, "
- "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
- attr->num_subbuf, attr->switch_timer_interval,
- attr->read_timer_interval, attr->output, attr->type);
+ "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
+ "switch_timer_interval: %u, read_timer_interval: %u, "
+ "output: %d, type: %d",
+ attr->overwrite,
+ attr->subbuf_size,
+ attr->num_subbuf,
+ attr->switch_timer_interval,
+ attr->read_timer_interval,
+ attr->output,
+ attr->type);
if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA)
nr_stream_fds = 1;
goto error_alloc;
}
for (i = 0; i < nr_stream_fds; i++) {
- stream_fds[i] = open_ust_stream_fd(channel, i,
- &channel->buffer_credentials.value);
+ stream_fds[i] = open_ust_stream_fd(channel, i, &channel->buffer_credentials.value);
if (stream_fds[i] < 0) {
ret = -1;
goto error_open;
if (channel->shm_path[0]) {
char shm_path[PATH_MAX];
- closeret = get_stream_shm_path(shm_path,
- channel->shm_path, j);
+ closeret = get_stream_shm_path(shm_path, channel->shm_path, j);
if (closeret) {
ERR("Cannot get stream shm path");
}
closeret = run_as_unlink(shm_path,
- lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
- channel->buffer_credentials)),
- lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
- channel->buffer_credentials)));
+ lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+ channel->buffer_credentials)),
+ lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+ channel->buffer_credentials)));
if (closeret) {
PERROR("unlink %s", shm_path);
}
/* Try to rmdir all directories under shm_path root. */
if (channel->root_shm_path[0]) {
(void) run_as_rmdir_recursive(channel->root_shm_path,
- lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
- channel->buffer_credentials)),
- lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
- channel->buffer_credentials)),
- LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
+ lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+ channel->buffer_credentials)),
+ lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+ channel->buffer_credentials)),
+ LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
}
free(stream_fds);
error_alloc:
* Return 0 on success or else a negative value.
*/
static int send_channel_to_sessiond_and_relayd(int sock,
- struct lttng_consumer_channel *channel,
- struct lttng_consumer_local_data *ctx, int *relayd_error)
+ struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx,
+ int *relayd_error)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttng_consumer_stream *stream;
DBG("UST consumer sending channel %s to sessiond", channel->name);
if (channel->relayd_id != (uint64_t) -1ULL) {
- cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-
+ cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
health_code_update();
/* Try to send the stream to the relayd if one is available. */
DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd",
- stream->key, channel->name);
+ stream->key,
+ channel->name);
ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
if (ret < 0) {
/*
}
/* The channel was sent successfully to the sessiond at this point. */
- cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-
+ cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
health_code_update();
/* Send stream to session daemon. */
* MUST be destroyed by consumer_del_channel().
*/
static int ask_channel(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel,
- struct lttng_ust_ctl_consumer_channel_attr *attr)
+ struct lttng_consumer_channel *channel,
+ struct lttng_ust_ctl_consumer_channel_attr *attr)
{
int ret;
* On error, return a negative value else 0 on success.
*/
static int send_streams_to_thread(struct lttng_consumer_channel *channel,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx)
{
int ret = 0;
struct lttng_consumer_stream *stream, *stmp;
LTTNG_ASSERT(ctx);
/* Send streams to the corresponding thread. */
- cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
- send_node) {
-
+ cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
health_code_update();
/* Sending the stream to the thread. */
/* For each stream of the channel id, flush it. */
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
- &channel->key, &iter.iter, stream, node_channel_id.node) {
-
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
health_code_update();
pthread_mutex_lock(&stream->lock);
if (!stream->quiescent) {
ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0);
if (ret) {
- ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64 ", channel name = '%s'",
- chan_key, channel->name);
+ ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64
+ ", channel name = '%s'",
+ chan_key,
+ channel->name);
ret = LTTNG_ERR_BUFFER_FLUSH_FAILED;
pthread_mutex_unlock(&stream->lock);
goto error;
}
stream->quiescent = true;
}
-next:
+ next:
pthread_mutex_unlock(&stream->lock);
}
+
+ /*
+ * Send one last buffer statistics update to the session daemon. This
+ * ensures that the session daemon gets at least one statistics update
+ * per channel even in the case of short-lived channels, such as when a
+ * short-lived app is traced in per-pid mode.
+ */
+ sample_and_send_channel_buffer_stats(channel);
error:
rcu_read_unlock();
return ret;
/* For each stream of the channel id, clear quiescent state. */
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
- &channel->key, &iter.iter, stream, node_channel_id.node) {
-
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
health_code_update();
pthread_mutex_lock(&stream->lock);
/* Send metadata stream to relayd if needed. */
if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
- ret = consumer_send_relayd_stream(metadata->metadata_stream,
- metadata->pathname);
+ ret = consumer_send_relayd_stream(metadata->metadata_stream, metadata->pathname);
if (ret < 0) {
ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
goto error;
}
- ret = consumer_send_relayd_streams_sent(
- metadata->metadata_stream->net_seq_idx);
+ ret = consumer_send_relayd_streams_sent(metadata->metadata_stream->net_seq_idx);
if (ret < 0) {
ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
goto error;
* Returns 0 on success, < 0 on error
*/
static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
- uint64_t key, char *path, uint64_t relayd_id,
- struct lttng_consumer_local_data *ctx)
+ uint64_t key,
+ char *path,
+ uint64_t relayd_id,
+ struct lttng_consumer_local_data *ctx)
{
int ret = 0;
struct lttng_consumer_stream *metadata_stream;
LTTNG_ASSERT(ctx);
ASSERT_RCU_READ_LOCKED();
- DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
- key, path);
+ DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
rcu_read_lock();
metadata_stream->net_seq_idx = relayd_id;
ret = consumer_send_relayd_stream(metadata_stream, path);
} else {
- ret = consumer_stream_create_output_files(metadata_stream,
- false);
+ ret = consumer_stream_create_output_files(metadata_stream, false);
}
if (ret < 0) {
goto error_stream;
return ret;
}
-static
-int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
- const char **addr)
+static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr)
{
int ret;
unsigned long mmap_offset;
mmap_base = (const char *) lttng_ust_ctl_get_mmap_base(stream->ustream);
if (!mmap_base) {
- ERR("Failed to get mmap base for stream `%s`",
- stream->name);
+ ERR("Failed to get mmap base for stream `%s`", stream->name);
ret = -EPERM;
goto error;
}
*addr = mmap_base + mmap_offset;
error:
return ret;
-
}
/*
* Returns 0 on success, < 0 on error
*/
static int snapshot_channel(struct lttng_consumer_channel *channel,
- uint64_t key, char *path, uint64_t relayd_id,
- uint64_t nb_packets_per_stream,
- struct lttng_consumer_local_data *ctx)
+ uint64_t key,
+ char *path,
+ uint64_t relayd_id,
+ uint64_t nb_packets_per_stream,
+ struct lttng_consumer_local_data *ctx)
{
int ret;
unsigned use_relayd = 0;
LTTNG_ASSERT(!channel->monitor);
DBG("UST consumer snapshot channel %" PRIu64, key);
- cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
health_code_update();
/* Lock stream because we are about to change its state. */
if (use_relayd) {
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
- goto error_unlock;
+ goto error_close_stream;
}
} else {
- ret = consumer_stream_create_output_files(stream,
- false);
+ ret = consumer_stream_create_output_files(stream, false);
if (ret < 0) {
- goto error_unlock;
+ goto error_close_stream;
}
- DBG("UST consumer snapshot stream (%" PRIu64 ")",
- stream->key);
+ DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key);
}
/*
if (!stream->quiescent) {
ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0);
if (ret < 0) {
- ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64 ", channel name = '%s'",
- channel->key, channel->name);
+ ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64
+ ", channel name = '%s'",
+ channel->key,
+ channel->name);
goto error_unlock;
}
}
ret = lttng_ustconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking UST snapshot");
- goto error_unlock;
+ goto error_close_stream;
}
ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced UST snapshot position");
- goto error_unlock;
+ goto error_close_stream;
}
ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd UST snapshot position");
- goto error_unlock;
+ goto error_close_stream;
}
/*
* daemon should never send a maximum stream size that is lower than
* subbuffer size.
*/
- consumed_pos = consumer_get_consume_start_pos(consumed_pos,
- produced_pos, nb_packets_per_stream,
- stream->max_sb_size);
+ consumed_pos = consumer_get_consume_start_pos(
+ consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size);
while ((long) (consumed_pos - produced_pos) < 0) {
ssize_t read_len;
goto error_put_subbuf;
}
- subbuf_view = lttng_buffer_view_init(
- subbuf_addr, 0, padded_len);
+ subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
read_len = lttng_consumer_on_read_subbuffer_mmap(
- stream, &subbuf_view, padded_len - len);
+ stream, &subbuf_view, padded_len - len);
if (use_relayd) {
if (read_len != len) {
ret = -EPERM;
}
/* Simply close the stream so we can use it on the next snapshot. */
- consumer_stream_close(stream);
+ consumer_stream_close_output(stream);
pthread_mutex_unlock(&stream->lock);
}
ERR("Snapshot lttng_ust_ctl_put_subbuf");
}
error_close_stream:
- consumer_stream_close(stream);
+ consumer_stream_close_output(stream);
error_unlock:
pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return ret;
}
-static
-void metadata_stream_reset_cache_consumed_position(
- struct lttng_consumer_stream *stream)
+static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream)
{
ASSERT_LOCKED(stream->lock);
- DBG("Reset metadata cache of session %" PRIu64,
- stream->chan->session_id);
+ DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id);
stream->ust_metadata_pushed = 0;
}
* the metadata cache flush to concurrently progress in order to
* complete.
*/
-int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
- uint64_t len, uint64_t version,
- struct lttng_consumer_channel *channel, int timer, int wait)
+int lttng_ustconsumer_recv_metadata(int sock,
+ uint64_t key,
+ uint64_t offset,
+ uint64_t len,
+ uint64_t version,
+ struct lttng_consumer_channel *channel,
+ int timer,
+ int wait)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
char *metadata_str;
pthread_mutex_lock(&channel->metadata_cache->lock);
cache_write_status = consumer_metadata_cache_write(
- channel->metadata_cache, offset, len, version,
- metadata_str);
+ channel->metadata_cache, offset, len, version, metadata_str);
pthread_mutex_unlock(&channel->metadata_cache->lock);
switch (cache_write_status) {
case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
*/
if (channel->metadata_stream != NULL) {
pthread_mutex_lock(&channel->metadata_stream->lock);
- metadata_stream_reset_cache_consumed_position(
- channel->metadata_stream);
+ metadata_stream_reset_cache_consumed_position(channel->metadata_stream);
pthread_mutex_unlock(&channel->metadata_stream->lock);
} else {
/* Validate we are in snapshot mode. */
* Return 1 on success else a negative value or 0.
*/
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
- int sock, struct pollfd *consumer_sockpoll)
+ int sock,
+ struct pollfd *consumer_sockpoll)
{
int ret_func;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret_recv != sizeof(msg)) {
DBG("Consumer received unexpected message size %zd (expects %zu)",
- ret_recv, sizeof(msg));
+ ret_recv,
+ sizeof(msg));
/*
* The ret value might 0 meaning an orderly shutdown but this is ok
* since the caller handles this.
*/
if (ret_recv > 0) {
- lttng_consumer_send_error(ctx,
- LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
ret_recv = -1;
}
return ret_recv;
uint32_t major = msg.u.relayd_sock.major;
uint32_t minor = msg.u.relayd_sock.minor;
enum lttcomm_sock_proto protocol =
- (enum lttcomm_sock_proto) msg.u.relayd_sock
- .relayd_socket_protocol;
+ (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
/* Session daemon status message are handled in the following call. */
consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
- msg.u.relayd_sock.type, ctx, sock,
- consumer_sockpoll, msg.u.relayd_sock.session_id,
- msg.u.relayd_sock.relayd_session_id, major,
- minor, protocol);
+ msg.u.relayd_sock.type,
+ ctx,
+ sock,
+ consumer_sockpoll,
+ msg.u.relayd_sock.session_id,
+ msg.u.relayd_sock.relayd_session_id,
+ major,
+ minor,
+ protocol);
goto end_nosignal;
}
case LTTNG_CONSUMER_DESTROY_RELAYD:
is_data_pending = consumer_data_pending(id);
/* Send back returned value to session daemon */
- ret_send = lttcomm_send_unix_sock(sock, &is_data_pending,
- sizeof(is_data_pending));
+ ret_send = lttcomm_send_unix_sock(sock, &is_data_pending, sizeof(is_data_pending));
if (ret_send < 0) {
- DBG("Error when sending the data pending ret code: %zd",
- ret_send);
+ DBG("Error when sending the data pending ret code: %zd", ret_send);
goto error_fatal;
}
/* Create a plain object and reserve a channel key. */
channel = consumer_allocate_channel(
- msg.u.ask_channel.key,
- msg.u.ask_channel.session_id,
- msg.u.ask_channel.chunk_id.is_set ?
- &chunk_id : NULL,
- msg.u.ask_channel.pathname,
- msg.u.ask_channel.name,
- msg.u.ask_channel.relayd_id,
- (enum lttng_event_output) msg.u.ask_channel.output,
- msg.u.ask_channel.tracefile_size,
- msg.u.ask_channel.tracefile_count,
- msg.u.ask_channel.session_id_per_pid,
- msg.u.ask_channel.monitor,
- msg.u.ask_channel.live_timer_interval,
- msg.u.ask_channel.is_live,
- msg.u.ask_channel.root_shm_path,
- msg.u.ask_channel.shm_path);
+ msg.u.ask_channel.key,
+ msg.u.ask_channel.session_id,
+ msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL,
+ msg.u.ask_channel.pathname,
+ msg.u.ask_channel.name,
+ msg.u.ask_channel.relayd_id,
+ (enum lttng_event_output) msg.u.ask_channel.output,
+ msg.u.ask_channel.tracefile_size,
+ msg.u.ask_channel.tracefile_count,
+ msg.u.ask_channel.session_id_per_pid,
+ msg.u.ask_channel.monitor,
+ msg.u.ask_channel.live_timer_interval,
+ msg.u.ask_channel.is_live,
+ msg.u.ask_channel.root_shm_path,
+ msg.u.ask_channel.shm_path);
if (!channel) {
goto end_channel_error;
}
- LTTNG_OPTIONAL_SET(&channel->buffer_credentials,
- buffer_credentials);
+ LTTNG_OPTIONAL_SET(&channel->buffer_credentials, buffer_credentials);
/*
* Assign UST application UID to the channel. This value is ignored for
attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
attr.chan_id = msg.u.ask_channel.chan_id;
memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
- attr.blocking_timeout= msg.u.ask_channel.blocking_timeout;
+ attr.blocking_timeout = msg.u.ask_channel.blocking_timeout;
/* Match channel buffer type to the UST abi. */
switch (msg.u.ask_channel.output) {
if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) {
int ret_allocate;
- ret_allocate = consumer_metadata_cache_allocate(
- channel);
+ ret_allocate = consumer_metadata_cache_allocate(channel);
if (ret_allocate < 0) {
ERR("Allocating metadata cache");
goto end_channel_error;
} else {
int monitor_start_ret;
- consumer_timer_live_start(channel,
- msg.u.ask_channel.live_timer_interval);
+ consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval);
monitor_start_ret = consumer_timer_monitor_start(
- channel,
- msg.u.ask_channel.monitor_timer_interval);
+ channel, msg.u.ask_channel.monitor_timer_interval);
if (monitor_start_ret < 0) {
ERR("Starting channel monitoring timer failed");
goto end_channel_error;
health_code_update();
/* Send the channel to sessiond (and relayd, if applicable). */
- ret = send_channel_to_sessiond_and_relayd(
- sock, found_channel, ctx, &relayd_err);
+ ret = send_channel_to_sessiond_and_relayd(sock, found_channel, ctx, &relayd_err);
if (ret < 0) {
if (relayd_err) {
/*
}
/* List MUST be empty after or else it could be reused. */
LTTNG_ASSERT(cds_list_empty(&found_channel->streams.head));
-end_get_channel:
+ end_get_channel:
goto end_msg_sessiond;
-error_get_channel_fatal:
+ error_get_channel_fatal:
goto error_fatal;
-end_get_channel_nosignal:
+ end_get_channel_nosignal:
goto end_nosignal;
}
case LTTNG_CONSUMER_DESTROY_CHANNEL:
{
int ret;
- ret = clear_quiescent_channel(
- msg.u.clear_quiescent_channel.key);
+ ret = clear_quiescent_channel(msg.u.clear_quiescent_channel.key);
if (ret != 0) {
ret_code = (lttcomm_return_code) ret;
}
uint64_t version = msg.u.push_metadata.version;
struct lttng_consumer_channel *found_channel;
- DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
- len);
+ DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
found_channel = consumer_find_channel(key);
if (!found_channel) {
health_code_update();
- ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len,
- version, found_channel, 0, 1);
+ ret = lttng_ustconsumer_recv_metadata(
+ sock, key, offset, len, version, found_channel, 0, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_push_metadata_fatal;
ret_code = (lttcomm_return_code) ret;
goto end_push_metadata_msg_sessiond;
}
-end_push_metadata_msg_sessiond:
+ end_push_metadata_msg_sessiond:
goto end_msg_sessiond;
-error_push_metadata_fatal:
+ error_push_metadata_fatal:
goto error_fatal;
}
case LTTNG_CONSUMER_SETUP_METADATA:
int ret_snapshot;
ret_snapshot = snapshot_metadata(found_channel,
- key,
- msg.u.snapshot_channel.pathname,
- msg.u.snapshot_channel.relayd_id,
- ctx);
+ key,
+ msg.u.snapshot_channel.pathname,
+ msg.u.snapshot_channel.relayd_id,
+ ctx);
if (ret_snapshot < 0) {
ERR("Snapshot metadata failed");
ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
} else {
int ret_snapshot;
- ret_snapshot = snapshot_channel(found_channel,
- key,
- msg.u.snapshot_channel.pathname,
- msg.u.snapshot_channel.relayd_id,
- msg.u.snapshot_channel
- .nb_packets_per_stream,
- ctx);
+ ret_snapshot = snapshot_channel(
+ found_channel,
+ key,
+ msg.u.snapshot_channel.pathname,
+ msg.u.snapshot_channel.relayd_id,
+ msg.u.snapshot_channel.nb_packets_per_stream,
+ ctx);
if (ret_snapshot < 0) {
ERR("Snapshot channel failed");
ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
uint64_t id = msg.u.discarded_events.session_id;
uint64_t key = msg.u.discarded_events.channel_key;
- DBG("UST consumer discarded events command for session id %"
- PRIu64, id);
+ DBG("UST consumer discarded events command for session id %" PRIu64, id);
rcu_read_lock();
pthread_mutex_lock(&the_consumer_data.lock);
*/
discarded_events = 0;
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct, &id,
- &iter.iter, stream, node_session_id.node) {
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct,
+ &id,
+ &iter.iter,
+ stream,
+ node_session_id.node)
+ {
if (stream->chan->key == key) {
discarded_events = stream->chan->discarded_events;
break;
pthread_mutex_unlock(&the_consumer_data.lock);
rcu_read_unlock();
- DBG("UST consumer discarded events command for session id %"
- PRIu64 ", channel key %" PRIu64, id, key);
+ DBG("UST consumer discarded events command for session id %" PRIu64
+ ", channel key %" PRIu64,
+ id,
+ key);
health_code_update();
}
case LTTNG_CONSUMER_LOST_PACKETS:
{
- int ret;
+ int ret;
uint64_t lost_packets;
struct lttng_ht_iter iter;
struct lttng_ht *ht;
uint64_t id = msg.u.lost_packets.session_id;
uint64_t key = msg.u.lost_packets.channel_key;
- DBG("UST consumer lost packets command for session id %"
- PRIu64, id);
+ DBG("UST consumer lost packets command for session id %" PRIu64, id);
rcu_read_lock();
pthread_mutex_lock(&the_consumer_data.lock);
* to extract the information we need, we default to 0 if not
* found (no packets lost if the channel is not yet in use).
*/
- lost_packets = 0;
+ lost_packets = 0;
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct, &id,
- &iter.iter, stream, node_session_id.node) {
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct,
+ &id,
+ &iter.iter,
+ stream,
+ node_session_id.node)
+ {
if (stream->chan->key == key) {
- lost_packets = stream->chan->lost_packets;
+ lost_packets = stream->chan->lost_packets;
break;
}
}
pthread_mutex_unlock(&the_consumer_data.lock);
rcu_read_unlock();
- DBG("UST consumer lost packets command for session id %"
- PRIu64 ", channel key %" PRIu64, id, key);
+ DBG("UST consumer lost packets command for session id %" PRIu64
+ ", channel key %" PRIu64,
+ id,
+ key);
health_code_update();
/* Send back returned value to session daemon */
- ret = lttcomm_send_unix_sock(sock, &lost_packets,
- sizeof(lost_packets));
+ ret = lttcomm_send_unix_sock(sock, &lost_packets, sizeof(lost_packets));
if (ret < 0) {
PERROR("send lost packets");
goto error_fatal;
}
case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
{
- int channel_monitor_pipe, ret_send,
- ret_set_channel_monitor_pipe;
+ int channel_monitor_pipe, ret_send, ret_set_channel_monitor_pipe;
ssize_t ret_recv;
ret_code = LTTCOMM_CONSUMERD_SUCCESS;
goto error_fatal;
}
- ret_recv = lttcomm_recv_fds_unix_sock(
- sock, &channel_monitor_pipe, 1);
+ ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1);
if (ret_recv != sizeof(channel_monitor_pipe)) {
ERR("Failed to receive channel monitor pipe");
goto error_fatal;
DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
ret_set_channel_monitor_pipe =
- consumer_timer_thread_set_channel_monitor_pipe(
- channel_monitor_pipe);
+ consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe);
if (!ret_set_channel_monitor_pipe) {
int flags;
int ret_fcntl;
}
flags = ret_fcntl;
- ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
- flags | O_NONBLOCK);
+ ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK);
if (ret_fcntl == -1) {
PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
goto error_fatal;
* this channel.
*/
rotate_channel = lttng_consumer_rotate_channel(
- found_channel, key,
- msg.u.rotate_channel.relayd_id);
+ found_channel, key, msg.u.rotate_channel.relayd_id);
if (rotate_channel < 0) {
ERR("Rotate channel failed");
ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
int ret_rotate_read_streams;
ret_rotate_read_streams =
- lttng_consumer_rotate_ready_streams(
- found_channel, key);
+ lttng_consumer_rotate_ready_streams(found_channel, key);
if (ret_rotate_read_streams < 0) {
ERR("Rotate channel failed");
}
}
break;
-end_rotate_channel_nosignal:
+ end_rotate_channel_nosignal:
goto end_nosignal;
}
case LTTNG_CONSUMER_CLEAR_CHANNEL:
} else {
int ret_clear_channel;
- ret_clear_channel = lttng_consumer_clear_channel(
- found_channel);
+ ret_clear_channel = lttng_consumer_clear_channel(found_channel);
if (ret_clear_channel) {
ERR("Clear channel failed key %" PRIu64, key);
ret_code = (lttcomm_return_code) ret_clear_channel;
int ret_send_status;
lttng_uuid sessiond_uuid;
- std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
- sessiond_uuid.begin());
+ std::copy(std::begin(msg.u.init.sessiond_uuid),
+ std::end(msg.u.init.sessiond_uuid),
+ sessiond_uuid.begin());
ret_code = lttng_consumer_init_command(ctx, sessiond_uuid);
health_code_update();
ret_send_status = consumer_send_status_msg(sock, ret_code);
case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
{
const struct lttng_credentials credentials = {
- .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
- .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
+ .uid = LTTNG_OPTIONAL_INIT_VALUE(
+ msg.u.create_trace_chunk.credentials.value.uid),
+ .gid = LTTNG_OPTIONAL_INIT_VALUE(
+ msg.u.create_trace_chunk.credentials.value.gid),
};
- const bool is_local_trace =
- !msg.u.create_trace_chunk.relayd_id.is_set;
- const uint64_t relayd_id =
- msg.u.create_trace_chunk.relayd_id.value;
- const char *chunk_override_name =
- *msg.u.create_trace_chunk.override_name ?
- msg.u.create_trace_chunk.override_name :
- NULL;
+ const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set;
+ const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value;
+ const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ?
+ msg.u.create_trace_chunk.override_name :
+ NULL;
struct lttng_directory_handle *chunk_directory_handle = NULL;
/*
ssize_t ret_recv;
/* Acnowledge the reception of the command. */
- ret_send_status = consumer_send_status_msg(
- sock, LTTCOMM_CONSUMERD_SUCCESS);
+ ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
/*
* Receive trace chunk domain dirfd.
*/
- ret_recv = lttcomm_recv_fds_unix_sock(
- sock, &chunk_dirfd, 1);
+ ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
if (ret_recv != sizeof(chunk_dirfd)) {
ERR("Failed to receive trace chunk domain directory file descriptor");
goto error_fatal;
}
- DBG("Received trace chunk domain directory fd (%d)",
- chunk_dirfd);
- chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
- chunk_dirfd);
+ DBG("Received trace chunk domain directory fd (%d)", chunk_dirfd);
+ chunk_directory_handle =
+ lttng_directory_handle_create_from_dirfd(chunk_dirfd);
if (!chunk_directory_handle) {
ERR("Failed to initialize chunk domain directory handle from directory file descriptor");
if (close(chunk_dirfd)) {
}
ret_code = lttng_consumer_create_trace_chunk(
- !is_local_trace ? &relayd_id : NULL,
- msg.u.create_trace_chunk.session_id,
- msg.u.create_trace_chunk.chunk_id,
- (time_t) msg.u.create_trace_chunk
- .creation_timestamp,
- chunk_override_name,
- msg.u.create_trace_chunk.credentials.is_set ?
- &credentials :
- NULL,
- chunk_directory_handle);
+ !is_local_trace ? &relayd_id : NULL,
+ msg.u.create_trace_chunk.session_id,
+ msg.u.create_trace_chunk.chunk_id,
+ (time_t) msg.u.create_trace_chunk.creation_timestamp,
+ chunk_override_name,
+ msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL,
+ chunk_directory_handle);
lttng_directory_handle_put(chunk_directory_handle);
goto end_msg_sessiond;
}
case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
{
enum lttng_trace_chunk_command_type close_command =
- (lttng_trace_chunk_command_type)
- msg.u.close_trace_chunk.close_command.value;
- const uint64_t relayd_id =
- msg.u.close_trace_chunk.relayd_id.value;
+ (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
+ const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value;
struct lttcomm_consumer_close_trace_chunk_reply reply;
char closed_trace_chunk_path[LTTNG_PATH_MAX] = {};
int ret;
ret_code = lttng_consumer_close_trace_chunk(
- msg.u.close_trace_chunk.relayd_id.is_set ?
- &relayd_id :
- NULL,
- msg.u.close_trace_chunk.session_id,
- msg.u.close_trace_chunk.chunk_id,
- (time_t) msg.u.close_trace_chunk.close_timestamp,
- msg.u.close_trace_chunk.close_command.is_set ?
- &close_command :
- NULL, closed_trace_chunk_path);
+ msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : NULL,
+ msg.u.close_trace_chunk.session_id,
+ msg.u.close_trace_chunk.chunk_id,
+ (time_t) msg.u.close_trace_chunk.close_timestamp,
+ msg.u.close_trace_chunk.close_command.is_set ? &close_command : NULL,
+ closed_trace_chunk_path);
reply.ret_code = ret_code;
reply.path_length = strlen(closed_trace_chunk_path) + 1;
ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
if (ret != sizeof(reply)) {
goto error_fatal;
}
- ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path,
- reply.path_length);
+ ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, reply.path_length);
if (ret != reply.path_length) {
goto error_fatal;
}
}
case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
{
- const uint64_t relayd_id =
- msg.u.trace_chunk_exists.relayd_id.value;
+ const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value;
ret_code = lttng_consumer_trace_chunk_exists(
- msg.u.trace_chunk_exists.relayd_id.is_set ?
- &relayd_id : NULL,
- msg.u.trace_chunk_exists.session_id,
- msg.u.trace_chunk_exists.chunk_id);
+ msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : NULL,
+ msg.u.trace_chunk_exists.session_id,
+ msg.u.trace_chunk_exists.chunk_id);
goto end_msg_sessiond;
}
case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
{
const uint64_t key = msg.u.open_channel_packets.key;
- struct lttng_consumer_channel *found_channel =
- consumer_find_channel(key);
+ struct lttng_consumer_channel *found_channel = consumer_find_channel(key);
if (found_channel) {
pthread_mutex_lock(&found_channel->lock);
- ret_code = lttng_consumer_open_channel_packets(
- found_channel);
+ ret_code = lttng_consumer_open_channel_packets(found_channel);
pthread_mutex_unlock(&found_channel->lock);
} else {
/*
return ret_func;
}
-int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream,
- int producer_active)
+int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
{
LTTNG_ASSERT(stream);
LTTNG_ASSERT(stream->ustream);
*
* Returns 0 on success, < 0 on error.
*/
-int lttng_ustconsumer_sample_snapshot_positions(
- struct lttng_consumer_stream *stream)
+int lttng_ustconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
{
LTTNG_ASSERT(stream);
LTTNG_ASSERT(stream->ustream);
*
* Returns 0 on success, < 0 on error
*/
-int lttng_ustconsumer_get_produced_snapshot(
- struct lttng_consumer_stream *stream, unsigned long *pos)
+int lttng_ustconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
{
LTTNG_ASSERT(stream);
LTTNG_ASSERT(stream->ustream);
*
* Returns 0 on success, < 0 on error
*/
-int lttng_ustconsumer_get_consumed_snapshot(
- struct lttng_consumer_stream *stream, unsigned long *pos)
+int lttng_ustconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+ unsigned long *pos)
{
LTTNG_ASSERT(stream);
LTTNG_ASSERT(stream->ustream);
return lttng_ust_ctl_snapshot_get_consumed(stream->ustream, pos);
}
-int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
- int producer)
+int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, int producer)
{
LTTNG_ASSERT(stream);
LTTNG_ASSERT(stream->ustream);
return lttng_ust_ctl_clear_buffer(stream->ustream);
}
-int lttng_ustconsumer_get_current_timestamp(
- struct lttng_consumer_stream *stream, uint64_t *ts)
+int lttng_ustconsumer_get_current_timestamp(struct lttng_consumer_stream *stream, uint64_t *ts)
{
LTTNG_ASSERT(stream);
LTTNG_ASSERT(stream->ustream);
return lttng_ust_ctl_get_current_timestamp(stream->ustream, ts);
}
-int lttng_ustconsumer_get_sequence_number(
- struct lttng_consumer_stream *stream, uint64_t *seq)
+int lttng_ustconsumer_get_sequence_number(struct lttng_consumer_stream *stream, uint64_t *seq)
{
LTTNG_ASSERT(stream);
LTTNG_ASSERT(stream->ustream);
ERR("Cannot get stream shm path");
}
ret = run_as_unlink(shm_path,
- lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
- chan->buffer_credentials)),
- lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
- chan->buffer_credentials)));
+ lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+ chan->buffer_credentials)),
+ lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+ chan->buffer_credentials)));
if (ret) {
PERROR("unlink %s", shm_path);
}
lttng_ust_ctl_destroy_channel(chan->uchan);
/* Try to rmdir all directories under shm_path root. */
if (chan->root_shm_path[0]) {
- (void) run_as_rmdir_recursive(chan->root_shm_path,
- lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
- chan->buffer_credentials)),
- lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
- chan->buffer_credentials)),
- LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
+ (void) run_as_rmdir_recursive(
+ chan->root_shm_path,
+ lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)),
+ lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)),
+ LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
}
free(chan->stream_fds);
}
* Returns the number of bytes pushed from the cache into the ring buffer, or a
* negative value on error.
*/
-static
-int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
+static int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
{
ssize_t write_len;
int ret;
pthread_mutex_lock(&stream->chan->metadata_cache->lock);
- if (stream->chan->metadata_cache->contents.size ==
- stream->ust_metadata_pushed) {
+ if (stream->chan->metadata_cache->contents.size == stream->ust_metadata_pushed) {
/*
* In the context of a user space metadata channel, a
* change in version can be detected in two ways:
* occur as part of the pre-consume) until the metadata size
* exceeded the cache size.
*/
- if (stream->metadata_version !=
- stream->chan->metadata_cache->version) {
+ if (stream->metadata_version != stream->chan->metadata_cache->version) {
metadata_stream_reset_cache_consumed_position(stream);
consumer_stream_metadata_set_version(stream,
- stream->chan->metadata_cache->version);
+ stream->chan->metadata_cache->version);
} else {
ret = 0;
goto end;
}
}
- write_len = lttng_ust_ctl_write_one_packet_to_channel(stream->chan->uchan,
- &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed],
- stream->chan->metadata_cache->contents.size -
- stream->ust_metadata_pushed);
+ write_len = lttng_ust_ctl_write_one_packet_to_channel(
+ stream->chan->uchan,
+ &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed],
+ stream->chan->metadata_cache->contents.size - stream->ust_metadata_pushed);
LTTNG_ASSERT(write_len != 0);
if (write_len < 0) {
ERR("Writing one metadata packet");
}
stream->ust_metadata_pushed += write_len;
- LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >=
- stream->ust_metadata_pushed);
+ LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
ret = write_len;
/*
return ret;
}
-
/*
* Sync metadata meaning request them to the session daemon and snapshot to the
* metadata thread can consumer them.
*
* The RCU read side lock must be held by the caller.
*/
-enum sync_metadata_status lttng_ustconsumer_sync_metadata(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *metadata_stream)
+enum sync_metadata_status
+lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *metadata_stream)
{
int ret;
enum sync_metadata_status status;
*/
if (consumer_stream_is_deleted(metadata_stream)) {
DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization",
- metadata_stream->key);
+ metadata_stream->key);
status = SYNC_METADATA_STATUS_NO_DATA;
goto end;
}
ret = lttng_ust_ctl_snapshot(metadata_stream->ustream);
if (ret < 0) {
- ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", ret);
+ ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d",
+ ret);
status = SYNC_METADATA_STATUS_ERROR;
goto end;
}
* Return 0 on success else a negative value.
*/
static int notify_if_more_data(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx)
{
int ret;
struct lttng_ust_ctl_consumer_stream *ustream;
}
static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuf)
+ struct stream_subbuffer *subbuf)
{
int ret;
- ret = lttng_ust_ctl_get_subbuf_size(
- stream->ustream, &subbuf->info.data.subbuf_size);
+ ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &subbuf->info.data.subbuf_size);
if (ret) {
goto end;
}
- ret = lttng_ust_ctl_get_padded_subbuf_size(
- stream->ustream, &subbuf->info.data.padded_subbuf_size);
+ ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream,
+ &subbuf->info.data.padded_subbuf_size);
if (ret) {
goto end;
}
}
static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuf)
+ struct stream_subbuffer *subbuf)
{
int ret;
}
static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuf)
+ struct stream_subbuffer *subbuf)
{
int ret;
goto end;
}
- ret = lttng_ust_ctl_get_packet_size(
- stream->ustream, &subbuf->info.data.packet_size);
+ ret = lttng_ust_ctl_get_packet_size(stream->ustream, &subbuf->info.data.packet_size);
if (ret < 0) {
PERROR("Failed to get sub-buffer packet size");
goto end;
}
- ret = lttng_ust_ctl_get_content_size(
- stream->ustream, &subbuf->info.data.content_size);
+ ret = lttng_ust_ctl_get_content_size(stream->ustream, &subbuf->info.data.content_size);
if (ret < 0) {
PERROR("Failed to get sub-buffer content size");
goto end;
}
- ret = lttng_ust_ctl_get_timestamp_begin(
- stream->ustream, &subbuf->info.data.timestamp_begin);
+ ret = lttng_ust_ctl_get_timestamp_begin(stream->ustream,
+ &subbuf->info.data.timestamp_begin);
if (ret < 0) {
PERROR("Failed to get sub-buffer begin timestamp");
goto end;
}
- ret = lttng_ust_ctl_get_timestamp_end(
- stream->ustream, &subbuf->info.data.timestamp_end);
+ ret = lttng_ust_ctl_get_timestamp_end(stream->ustream, &subbuf->info.data.timestamp_end);
if (ret < 0) {
PERROR("Failed to get sub-buffer end timestamp");
goto end;
}
- ret = lttng_ust_ctl_get_events_discarded(
- stream->ustream, &subbuf->info.data.events_discarded);
+ ret = lttng_ust_ctl_get_events_discarded(stream->ustream,
+ &subbuf->info.data.events_discarded);
if (ret) {
PERROR("Failed to get sub-buffer events discarded count");
goto end;
}
ret = lttng_ust_ctl_get_sequence_number(stream->ustream,
- &subbuf->info.data.sequence_number.value);
+ &subbuf->info.data.sequence_number.value);
if (ret) {
/* May not be supported by older LTTng-modules. */
if (ret != -ENOTTY) {
subbuf->info.data.sequence_number.is_set = true;
}
- ret = lttng_ust_ctl_get_stream_id(
- stream->ustream, &subbuf->info.data.stream_id);
+ ret = lttng_ust_ctl_get_stream_id(stream->ustream, &subbuf->info.data.stream_id);
if (ret < 0) {
PERROR("Failed to get stream id");
goto end;
}
ret = lttng_ust_ctl_get_instance_id(stream->ustream,
- &subbuf->info.data.stream_instance_id.value);
+ &subbuf->info.data.stream_instance_id.value);
if (ret) {
/* May not be supported by older LTTng-modules. */
if (ret != -ENOTTY) {
}
static int get_next_subbuffer_common(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+ struct stream_subbuffer *subbuffer)
{
int ret;
const char *addr;
- ret = stream->read_subbuffer_ops.extract_subbuffer_info(
- stream, subbuffer);
+ ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer);
if (ret) {
goto end;
}
goto end;
}
- subbuffer->buffer.buffer = lttng_buffer_view_init(
- addr, 0, subbuffer->info.data.padded_subbuf_size);
+ subbuffer->buffer.buffer =
+ lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
LTTNG_ASSERT(subbuffer->buffer.buffer.data != NULL);
end:
return ret;
}
-static enum get_next_subbuffer_status get_next_subbuffer(
- struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+static enum get_next_subbuffer_status get_next_subbuffer(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
{
int ret;
enum get_next_subbuffer_status status;
status = GET_NEXT_SUBBUFFER_STATUS_OK;
break;
case -ENODATA:
- case -EAGAIN:
+ case -EAGAIN:
/*
* The caller only expects -ENODATA when there is no data to
* read, but the kernel tracer returns -EAGAIN when there is
return status;
}
-static enum get_next_subbuffer_status get_next_subbuffer_metadata(
- struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+static enum get_next_subbuffer_status
+get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
{
int ret;
bool cache_empty;
} else {
pthread_mutex_lock(&stream->chan->metadata_cache->lock);
cache_empty = stream->chan->metadata_cache->contents.size ==
- stream->ust_metadata_pushed;
+ stream->ust_metadata_pushed;
pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
}
} while (!got_subbuffer);
}
static int put_next_subbuffer(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer __attribute__((unused)))
+ struct stream_subbuffer *subbuffer __attribute__((unused)))
{
const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream);
}
static int signal_metadata(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx __attribute__((unused)))
+ struct lttng_consumer_local_data *ctx __attribute__((unused)))
{
ASSERT_LOCKED(stream->metadata_rdv_lock);
return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
}
-static int lttng_ustconsumer_set_stream_ops(
- struct lttng_consumer_stream *stream)
+static int lttng_ustconsumer_set_stream_ops(struct lttng_consumer_stream *stream)
{
int ret = 0;
stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up;
if (stream->metadata_flag) {
- stream->read_subbuffer_ops.get_next_subbuffer =
- get_next_subbuffer_metadata;
- stream->read_subbuffer_ops.extract_subbuffer_info =
- extract_metadata_subbuffer_info;
+ stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_metadata;
+ stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info;
stream->read_subbuffer_ops.reset_metadata =
- metadata_stream_reset_cache_consumed_position;
+ metadata_stream_reset_cache_consumed_position;
if (stream->chan->is_live) {
stream->read_subbuffer_ops.on_sleep = signal_metadata;
- ret = consumer_stream_enable_metadata_bucketization(
- stream);
+ ret = consumer_stream_enable_metadata_bucketization(stream);
if (ret) {
goto end;
}
}
} else {
- stream->read_subbuffer_ops.get_next_subbuffer =
- get_next_subbuffer;
- stream->read_subbuffer_ops.extract_subbuffer_info =
- extract_data_subbuffer_info;
+ stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer;
+ stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info;
stream->read_subbuffer_ops.on_sleep = notify_if_more_data;
if (stream->chan->is_live) {
- stream->read_subbuffer_ops.send_live_beacon =
- consumer_flush_ust_index;
+ stream->read_subbuffer_ops.send_live_beacon = consumer_flush_ust_index;
}
}
* no current trace chunk on the parent channel.
*/
if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
- stream->chan->trace_chunk) {
+ stream->chan->trace_chunk) {
ret = consumer_stream_create_output_files(stream, true);
if (ret) {
goto error;
* whetnever ust_metadata_pushed is incremented, the associated
* metadata has been consumed from the metadata stream.
*/
- DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
- contiguous, pushed);
+ DBG("UST consumer metadata pending check: contiguous %" PRIu64
+ " vs pushed %" PRIu64,
+ contiguous,
+ pushed);
LTTNG_ASSERT(((int64_t) (contiguous - pushed)) >= 0);
if ((contiguous != pushed) ||
- (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
- ret = 1; /* Data is pending */
+ (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
+ ret = 1; /* Data is pending */
goto end;
}
} else {
*/
ret = lttng_ust_ctl_put_subbuf(stream->ustream);
LTTNG_ASSERT(ret == 0);
- ret = 1; /* Data is pending */
+ ret = 1; /* Data is pending */
goto end;
}
}
DBG("UST consumer closing all metadata streams");
rcu_read_lock();
- cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
- node.node) {
-
+ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
health_code_update();
pthread_mutex_lock(&stream->chan->lock);
lttng_ustconsumer_close_metadata(stream->chan);
pthread_mutex_unlock(&stream->chan->lock);
-
}
rcu_read_unlock();
}
* pushed out due to concurrent interaction with the session daemon.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel, int timer, int wait)
+ struct lttng_consumer_channel *channel,
+ int timer,
+ int wait)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
request.uid = channel->ust_app_uid;
request.key = channel->key;
- DBG("Sending metadata request to sessiond, session id %" PRIu64
- ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64,
- request.session_id, request.session_id_per_pid, request.uid,
- request.key);
+ DBG("Sending metadata request to sessiond, session id %" PRIu64 ", per-pid %" PRIu64
+ ", app UID %u and channel key %" PRIu64,
+ request.session_id,
+ request.session_id_per_pid,
+ request.uid,
+ request.key);
pthread_mutex_lock(&ctx->metadata_socket_lock);
health_code_update();
- ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
- sizeof(request));
+ ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, sizeof(request));
if (ret < 0) {
ERR("Asking metadata to sessiond");
goto end;
health_code_update();
/* Receive the metadata from sessiond */
- ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
- sizeof(msg));
+ ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
- DBG("Consumer received unexpected message size %d (expects %zu)",
- ret, sizeof(msg));
+ DBG("Consumer received unexpected message size %d (expects %zu)", ret, sizeof(msg));
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
/*
* The ret value might 0 meaning an orderly shutdown but this is ok
if (msg.cmd_type == LTTNG_ERR_UND) {
/* No registry found */
- (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
- ret_code);
+ (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
ret = 0;
goto end;
} else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
health_code_update();
/* Tell session daemon we are ready to receive the metadata. */
- ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
- LTTCOMM_CONSUMERD_SUCCESS);
+ ret = consumer_send_status_msg(ctx->consumer_metadata_socket, LTTCOMM_CONSUMERD_SUCCESS);
if (ret < 0 || len == 0) {
/*
* Somehow, the session daemon is not responding anymore or there is
health_code_update();
- ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, version, channel, timer, wait);
+ ret = lttng_ustconsumer_recv_metadata(
+ ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait);
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
/*
* Return the ustctl call for the get stream id.
*/
-int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
- uint64_t *stream_id)
+int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, uint64_t *stream_id)
{
LTTNG_ASSERT(stream);
LTTNG_ASSERT(stream_id);