/*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
* Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
#include "kernel-consumer.h"
-extern struct lttng_consumer_global_data consumer_data;
+extern struct lttng_consumer_global_data the_consumer_data;
extern int consumer_poll_timeout;
/*
/*
* Take a snapshot of all the stream of a channel
* RCU read-side lock must be held across this function to ensure existence of
- * channel. The channel lock must be held by the caller.
+ * channel.
*
* Returns 0 on success, < 0 on error
*/
DBG("Kernel consumer snapshot channel %" PRIu64, key);
+ /* Prevent channel modifications while we perform the snapshot.*/
+ pthread_mutex_lock(&channel->lock);
+
rcu_read_lock();
/* Splice is not supported yet for channel snapshot. */
pthread_mutex_unlock(&stream->lock);
end:
rcu_read_unlock();
+ pthread_mutex_unlock(&channel->lock);
return ret;
}
/*
* Read the whole metadata available for a snapshot.
* RCU read-side lock must be held across this function to ensure existence of
- * metadata_channel. The channel lock must be held by the caller.
+ * metadata_channel.
*
* Returns 0 on success, < 0 on error
*/
metadata_stream = metadata_channel->metadata_stream;
assert(metadata_stream);
- pthread_mutex_lock(&metadata_stream->lock);
+ metadata_stream->read_subbuffer_ops.lock(metadata_stream);
assert(metadata_channel->trace_chunk);
assert(metadata_stream->trace_chunk);
ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
if (ret_read < 0) {
- if (ret_read != -EAGAIN) {
- ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
- ret_read);
- ret = ret_read;
- goto error_snapshot;
- }
- /* ret_read is negative at this point so we will exit the loop. */
- continue;
+ ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
+ ret_read);
+ ret = ret_read;
+ goto error_snapshot;
}
- } while (ret_read >= 0);
+ } while (ret_read > 0);
if (use_relayd) {
close_relayd_stream(metadata_stream);
ret = 0;
error_snapshot:
- pthread_mutex_unlock(&metadata_stream->lock);
+ metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
cds_list_del(&metadata_stream->send_node);
consumer_stream_destroy(metadata_stream, NULL);
metadata_channel->metadata_stream = NULL;
int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll)
{
- ssize_t ret;
+ int ret_func;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttcomm_consumer_msg msg;
health_code_update();
- ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
- if (ret != sizeof(msg)) {
- if (ret > 0) {
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
- ret = -1;
+ {
+ ssize_t ret_recv;
+
+ ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
+ if (ret_recv != sizeof(msg)) {
+ if (ret_recv > 0) {
+ lttng_consumer_send_error(ctx,
+ LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+ ret_recv = -1;
+ }
+ return ret_recv;
}
- return ret;
}
health_code_update();
case LTTNG_CONSUMER_ADD_CHANNEL:
{
struct lttng_consumer_channel *new_channel;
- int ret_recv;
+ int ret_send_status, ret_add_channel = 0;
const uint64_t chunk_id = msg.u.channel.chunk_id.value;
health_code_update();
/* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_fatal;
}
health_code_update();
if (ctx->on_recv_channel != NULL) {
- ret_recv = ctx->on_recv_channel(new_channel);
- if (ret_recv == 0) {
- ret = consumer_add_channel(new_channel, ctx);
- } else if (ret_recv < 0) {
+ int ret_recv_channel =
+ ctx->on_recv_channel(new_channel);
+ if (ret_recv_channel == 0) {
+ ret_add_channel = consumer_add_channel(
+ new_channel, ctx);
+ } else if (ret_recv_channel < 0) {
goto end_nosignal;
}
} else {
- ret = consumer_add_channel(new_channel, ctx);
+ ret_add_channel =
+ consumer_add_channel(new_channel, ctx);
}
- if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) {
+ if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
+ !ret_add_channel) {
int monitor_start_ret;
DBG("Consumer starting monitor timer");
ERR("Starting channel monitoring timer failed");
goto end_nosignal;
}
-
}
health_code_update();
/* If we received an error in add_channel, we need to report it. */
- if (ret < 0) {
- ret = consumer_send_status_msg(sock, ret);
- if (ret < 0) {
+ if (ret_add_channel < 0) {
+ ret_send_status = consumer_send_status_msg(
+ sock, ret_add_channel);
+ if (ret_send_status < 0) {
goto error_fatal;
}
goto end_nosignal;
struct lttng_consumer_stream *new_stream;
struct lttng_consumer_channel *channel;
int alloc_ret = 0;
+ int ret_send_status, ret_poll, ret_get_max_subbuf_size;
+ ssize_t ret_pipe_write, ret_recv;
/*
* Get stream's channel reference. Needed when adding the stream to the
health_code_update();
/* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_add_stream_fatal;
}
/* Blocking call */
health_poll_entry();
- ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
health_poll_exit();
- if (ret) {
+ if (ret_poll) {
goto error_add_stream_fatal;
}
health_code_update();
/* Get stream file descriptor from socket */
- ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
- if (ret != sizeof(fd)) {
+ ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
+ if (ret_recv != sizeof(fd)) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+ ret_func = ret_recv;
goto end;
}
* above recv() failed, the session daemon is notified through the
* error socket and the teardown is eventually done.
*/
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_add_stream_nosignal;
}
}
new_stream->wait_fd = fd;
- ret = kernctl_get_max_subbuf_size(new_stream->wait_fd,
- &new_stream->max_sb_size);
- if (ret < 0) {
+ ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
+ new_stream->wait_fd, &new_stream->max_sb_size);
+ if (ret_get_max_subbuf_size < 0) {
pthread_mutex_unlock(&channel->lock);
ERR("Failed to get kernel maximal subbuffer size");
goto error_add_stream_nosignal;
pthread_mutex_lock(&new_stream->lock);
if (ctx->on_recv_stream) {
- ret = ctx->on_recv_stream(new_stream);
- if (ret < 0) {
+ int ret_recv_stream = ctx->on_recv_stream(new_stream);
+ if (ret_recv_stream < 0) {
pthread_mutex_unlock(&new_stream->lock);
pthread_mutex_unlock(&channel->lock);
consumer_stream_free(new_stream);
/* Send stream to relayd if the stream has an ID. */
if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
- ret = consumer_send_relayd_stream(new_stream,
- new_stream->chan->pathname);
- if (ret < 0) {
+ int ret_send_relayd_stream;
+
+ ret_send_relayd_stream = consumer_send_relayd_stream(
+ new_stream, new_stream->chan->pathname);
+ if (ret_send_relayd_stream < 0) {
pthread_mutex_unlock(&new_stream->lock);
pthread_mutex_unlock(&channel->lock);
consumer_stream_free(new_stream);
* to send the "streams_sent" command to relayd.
*/
if (channel->streams_sent_to_relayd) {
- ret = consumer_send_relayd_streams_sent(
- new_stream->net_seq_idx);
- if (ret < 0) {
+ int ret_send_relayd_streams_sent;
+
+ ret_send_relayd_streams_sent =
+ consumer_send_relayd_streams_sent(
+ new_stream->net_seq_idx);
+ if (ret_send_relayd_streams_sent < 0) {
pthread_mutex_unlock(&new_stream->lock);
pthread_mutex_unlock(&channel->lock);
goto error_add_stream_nosignal;
health_code_update();
- ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
- if (ret < 0) {
+ ret_pipe_write = lttng_pipe_write(
+ stream_pipe, &new_stream, sizeof(new_stream));
+ if (ret_pipe_write < 0) {
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
case LTTNG_CONSUMER_STREAMS_SENT:
{
struct lttng_consumer_channel *channel;
+ int ret_send_status;
/*
* Get stream's channel reference. Needed when adding the stream to the
/*
* Send status code to session daemon.
*/
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0 ||
+ ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
/* Somehow, the session daemon is not responding anymore. */
goto error_streams_sent_nosignal;
}
health_code_update();
/* Send stream to relayd if the stream has an ID. */
if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
- ret = consumer_send_relayd_streams_sent(
+ int ret_send_relay_streams;
+
+ ret_send_relay_streams = consumer_send_relayd_streams_sent(
msg.u.sent_streams.net_seq_idx);
- if (ret < 0) {
+ if (ret_send_relay_streams < 0) {
goto error_streams_sent_nosignal;
}
channel->streams_sent_to_relayd = true;
{
uint64_t index = msg.u.destroy_relayd.net_seq_idx;
struct consumer_relayd_sock_pair *relayd;
+ int ret_send_status;
DBG("Kernel consumer destroying relayd %" PRIu64, index);
health_code_update();
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_fatal;
}
}
case LTTNG_CONSUMER_DATA_PENDING:
{
- int32_t ret;
+ int32_t ret_data_pending;
uint64_t id = msg.u.data_pending.session_id;
+ ssize_t ret_send;
DBG("Kernel consumer data pending command for id %" PRIu64, id);
- ret = consumer_data_pending(id);
+ ret_data_pending = consumer_data_pending(id);
health_code_update();
/* Send back returned value to session daemon */
- ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
- if (ret < 0) {
+ ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
+ sizeof(ret_data_pending));
+ if (ret_send < 0) {
PERROR("send data pending ret code");
goto error_fatal;
}
{
struct lttng_consumer_channel *channel;
uint64_t key = msg.u.snapshot_channel.key;
+ int ret_send_status;
channel = consumer_find_channel(key);
if (!channel) {
ERR("Channel %" PRIu64 " not found", key);
ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
} else {
- pthread_mutex_lock(&channel->lock);
if (msg.u.snapshot_channel.metadata == 1) {
- ret = lttng_kconsumer_snapshot_metadata(channel, key,
+ int ret_snapshot;
+
+ ret_snapshot = lttng_kconsumer_snapshot_metadata(
+ channel, key,
msg.u.snapshot_channel.pathname,
- msg.u.snapshot_channel.relayd_id, ctx);
- if (ret < 0) {
+ msg.u.snapshot_channel.relayd_id,
+ ctx);
+ if (ret_snapshot < 0) {
ERR("Snapshot metadata failed");
ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
}
} else {
- ret = lttng_kconsumer_snapshot_channel(channel, key,
+ int ret_snapshot;
+
+ ret_snapshot = lttng_kconsumer_snapshot_channel(
+ channel, key,
msg.u.snapshot_channel.pathname,
msg.u.snapshot_channel.relayd_id,
- msg.u.snapshot_channel.nb_packets_per_stream,
+ msg.u.snapshot_channel
+ .nb_packets_per_stream,
ctx);
- if (ret < 0) {
+ if (ret_snapshot < 0) {
ERR("Snapshot channel failed");
ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
}
}
- pthread_mutex_unlock(&channel->lock);
}
health_code_update();
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
{
uint64_t key = msg.u.destroy_channel.key;
struct lttng_consumer_channel *channel;
+ int ret_send_status;
channel = consumer_find_channel(key);
if (!channel) {
health_code_update();
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_destroy_channel;
}
case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
{
int channel_monitor_pipe;
+ int ret_send_status, ret_set_channel_monitor_pipe;
+ ssize_t ret_recv;
ret_code = LTTCOMM_CONSUMERD_SUCCESS;
/* Successfully received the command's type. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
goto error_fatal;
}
- ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
- 1);
- if (ret != sizeof(channel_monitor_pipe)) {
+ ret_recv = lttcomm_recv_fds_unix_sock(
+ sock, &channel_monitor_pipe, 1);
+ if (ret_recv != sizeof(channel_monitor_pipe)) {
ERR("Failed to receive channel monitor pipe");
goto error_fatal;
}
DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
- ret = consumer_timer_thread_set_channel_monitor_pipe(
- channel_monitor_pipe);
- if (!ret) {
+ ret_set_channel_monitor_pipe =
+ consumer_timer_thread_set_channel_monitor_pipe(
+ channel_monitor_pipe);
+ if (!ret_set_channel_monitor_pipe) {
int flags;
+ int ret_fcntl;
ret_code = LTTCOMM_CONSUMERD_SUCCESS;
/* Set the pipe as non-blocking. */
- ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
- if (ret == -1) {
+ ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
+ if (ret_fcntl == -1) {
PERROR("fcntl get flags of the channel monitoring pipe");
goto error_fatal;
}
- flags = ret;
+ flags = ret_fcntl;
- ret = fcntl(channel_monitor_pipe, F_SETFL,
+ ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
flags | O_NONBLOCK);
- if (ret == -1) {
+ if (ret_fcntl == -1) {
PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
goto error_fatal;
}
} else {
ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
}
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
goto error_fatal;
}
break;
{
struct lttng_consumer_channel *channel;
uint64_t key = msg.u.rotate_channel.key;
+ int ret_send_status;
DBG("Consumer rotate channel %" PRIu64, key);
/*
* Sample the rotate position of all the streams in this channel.
*/
- ret = lttng_consumer_rotate_channel(channel, key,
+ int ret_rotate_channel;
+
+ ret_rotate_channel = lttng_consumer_rotate_channel(
+ channel, key,
msg.u.rotate_channel.relayd_id,
- msg.u.rotate_channel.metadata,
- ctx);
- if (ret < 0) {
+ msg.u.rotate_channel.metadata, ctx);
+ if (ret_rotate_channel < 0) {
ERR("Rotate channel failed");
ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
}
health_code_update();
}
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_rotate_channel;
}
if (channel) {
/* Rotate the streams that are ready right now. */
- ret = lttng_consumer_rotate_ready_streams(
+ int ret_rotate;
+
+ ret_rotate = lttng_consumer_rotate_ready_streams(
channel, key, ctx);
- if (ret < 0) {
+ if (ret_rotate < 0) {
ERR("Rotate ready streams failed");
}
}
{
struct lttng_consumer_channel *channel;
uint64_t key = msg.u.clear_channel.key;
+ int ret_send_status;
channel = consumer_find_channel(key);
if (!channel) {
DBG("Channel %" PRIu64 " not found", key);
ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
} else {
- ret = lttng_consumer_clear_channel(channel);
- if (ret) {
+ int ret_clear_channel;
+
+ ret_clear_channel =
+ lttng_consumer_clear_channel(channel);
+ if (ret_clear_channel) {
ERR("Clear channel failed");
- ret_code = ret;
+ ret_code = ret_clear_channel;
}
health_code_update();
}
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
}
case LTTNG_CONSUMER_INIT:
{
+ int ret_send_status;
+
ret_code = lttng_consumer_init_command(ctx,
msg.u.init.sessiond_uuid);
health_code_update();
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
{
const struct lttng_credentials credentials = {
- .uid = msg.u.create_trace_chunk.credentials.value.uid,
- .gid = msg.u.create_trace_chunk.credentials.value.gid,
+ .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
+ .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
};
const bool is_local_trace =
!msg.u.create_trace_chunk.relayd_id.is_set;
*/
if (is_local_trace) {
int chunk_dirfd;
+ int ret_send_status;
+ ssize_t ret_recv;
/* Acnowledge the reception of the command. */
- ret = consumer_send_status_msg(sock,
- LTTCOMM_CONSUMERD_SUCCESS);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(
+ sock, LTTCOMM_CONSUMERD_SUCCESS);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
- ret = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
- if (ret != sizeof(chunk_dirfd)) {
+ ret_recv = lttcomm_recv_fds_unix_sock(
+ sock, &chunk_dirfd, 1);
+ if (ret_recv != sizeof(chunk_dirfd)) {
ERR("Failed to receive trace chunk directory file descriptor");
goto error_fatal;
}
msg.u.close_trace_chunk.relayd_id.value;
struct lttcomm_consumer_close_trace_chunk_reply reply;
char path[LTTNG_PATH_MAX];
+ ssize_t ret_send;
ret_code = lttng_consumer_close_trace_chunk(
msg.u.close_trace_chunk.relayd_id.is_set ?
NULL, path);
reply.ret_code = ret_code;
reply.path_length = strlen(path) + 1;
- ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
- if (ret != sizeof(reply)) {
+ ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
+ if (ret_send != sizeof(reply)) {
goto error_fatal;
}
- ret = lttcomm_send_unix_sock(sock, path, reply.path_length);
- if (ret != reply.path_length) {
+ ret_send = lttcomm_send_unix_sock(
+ sock, path, reply.path_length);
+ if (ret_send != reply.path_length) {
goto error_fatal;
}
goto end_nosignal;
msg.u.trace_chunk_exists.chunk_id);
goto end_msg_sessiond;
}
+ case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
+ {
+ const uint64_t key = msg.u.open_channel_packets.key;
+ struct lttng_consumer_channel *channel =
+ consumer_find_channel(key);
+
+ if (channel) {
+ pthread_mutex_lock(&channel->lock);
+ ret_code = lttng_consumer_open_channel_packets(channel);
+ pthread_mutex_unlock(&channel->lock);
+ } else {
+ WARN("Channel %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+ goto end_msg_sessiond;
+ }
default:
goto end_nosignal;
}
* Return 1 to indicate success since the 0 value can be a socket
* shutdown during the recv() or send() call.
*/
- ret = 1;
+ ret_func = 1;
goto end;
error_fatal:
/* This will issue a consumer stop. */
- ret = -1;
+ ret_func = -1;
goto end;
end_msg_sessiond:
/*
* the caller because the session daemon socket management is done
* elsewhere. Returning a negative code or 0 will shutdown the consumer.
*/
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- goto error_fatal;
+ {
+ int ret_send_status;
+
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
+ goto error_fatal;
+ }
}
- ret = 1;
+
+ ret_func = 1;
+
end:
health_code_update();
rcu_read_unlock();
- return ret;
+ return ret_func;
}
/*
}
static
-int get_subbuffer_common(struct lttng_consumer_stream *stream,
+enum get_next_subbuffer_status get_subbuffer_common(
+ struct lttng_consumer_stream *stream,
struct stream_subbuffer *subbuffer)
{
int ret;
+ enum get_next_subbuffer_status status;
ret = kernctl_get_next_subbuf(stream->wait_fd);
- if (ret) {
+ switch (ret) {
+ case 0:
+ status = GET_NEXT_SUBBUFFER_STATUS_OK;
+ break;
+ case -ENODATA:
+ case -EAGAIN:
+ /*
+ * The caller only expects -ENODATA when there is no data to
+ * read, but the kernel tracer returns -EAGAIN when there is
+ * currently no data for a non-finalized stream, and -ENODATA
+ * when there is no data for a finalized stream. Those can be
+ * combined into a -ENODATA return value.
+ */
+ status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
+ goto end;
+ default:
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
goto end;
}
ret = stream->read_subbuffer_ops.extract_subbuffer_info(
- stream, subbuffer);
+ stream, subbuffer);
+ if (ret) {
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
+ }
end:
- return ret;
+ return status;
}
static
-int get_next_subbuffer_splice(struct lttng_consumer_stream *stream,
+enum get_next_subbuffer_status get_next_subbuffer_splice(
+ struct lttng_consumer_stream *stream,
struct stream_subbuffer *subbuffer)
{
- int ret;
+ const enum get_next_subbuffer_status status =
+ get_subbuffer_common(stream, subbuffer);
- ret = get_subbuffer_common(stream, subbuffer);
- if (ret) {
+ if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
goto end;
}
subbuffer->buffer.fd = stream->wait_fd;
end:
- return ret;
+ return status;
}
static
-int get_next_subbuffer_mmap(struct lttng_consumer_stream *stream,
+enum get_next_subbuffer_status get_next_subbuffer_mmap(
+ struct lttng_consumer_stream *stream,
struct stream_subbuffer *subbuffer)
{
int ret;
+ enum get_next_subbuffer_status status;
const char *addr;
- ret = get_subbuffer_common(stream, subbuffer);
- if (ret) {
+ status = get_subbuffer_common(stream, subbuffer);
+ if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
goto end;
}
ret = get_current_subbuf_addr(stream, &addr);
if (ret) {
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
goto end;
}
subbuffer->buffer.buffer = lttng_buffer_view_init(
addr, 0, subbuffer->info.data.padded_subbuf_size);
end:
- return ret;
+ return status;
}
static
-int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
+enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
struct stream_subbuffer *subbuffer)
{
int ret;
const char *addr;
bool coherent;
+ enum get_next_subbuffer_status status;
ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
&coherent);
subbuffer->info.metadata.padded_subbuf_size,
coherent ? "true" : "false");
end:
- return ret;
+ /*
+ * The caller only expects -ENODATA when there is no data to read, but
+ * the kernel tracer returns -EAGAIN when there is currently no data
+ * for a non-finalized stream, and -ENODATA when there is no data for a
+ * finalized stream. Those can be combined into a -ENODATA return value.
+ */
+ switch (ret) {
+ case 0:
+ status = GET_NEXT_SUBBUFFER_STATUS_OK;
+ break;
+ case -ENODATA:
+ case -EAGAIN:
+ /*
+ * The caller only expects -ENODATA when there is no data to
+ * read, but the kernel tracer returns -EAGAIN when there is
+ * currently no data for a non-finalized stream, and -ENODATA
+ * when there is no data for a finalized stream. Those can be
+ * combined into a -ENODATA return value.
+ */
+ status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
+ break;
+ default:
+ status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
+ break;
+ }
+
+ return status;
}
static
static
bool is_get_next_check_metadata_available(int tracer_fd)
{
- return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) !=
- -ENOTTY;
+ const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
+ const bool available = ret != -ENOTTY;
+
+ if (ret == 0) {
+ /* get succeeded, make sure to put the subbuffer. */
+ kernctl_put_subbuf(tracer_fd);
+ }
+
+ return available;
+}
+
+static
+int signal_metadata(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ ASSERT_LOCKED(stream->metadata_rdv_lock);
+ return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
}
static
metadata_bucket_destroy(stream->metadata_bucket);
stream->metadata_bucket = NULL;
}
+
+ stream->read_subbuffer_ops.on_sleep = signal_metadata;
}
if (!stream->read_subbuffer_ops.get_next_subbuffer) {