#include <common/sessiond-comm/sessiond-comm.h>
#include <common/sessiond-comm/relayd.h>
#include <common/compat/fcntl.h>
+#include <common/pipe.h>
#include <common/relayd/relayd.h>
#include <common/utils.h>
case LTTNG_CONSUMER_ADD_CHANNEL:
{
struct lttng_consumer_channel *new_channel;
+ int ret_recv;
/* First send a status message before receiving the fds. */
ret = consumer_send_status_msg(sock, ret_code);
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
-
DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
msg.u.channel.session_id, msg.u.channel.pathname,
};
if (ctx->on_recv_channel != NULL) {
- ret = ctx->on_recv_channel(new_channel);
- if (ret == 0) {
- consumer_add_channel(new_channel, ctx);
- } else if (ret < 0) {
+ ret_recv = ctx->on_recv_channel(new_channel);
+ if (ret_recv == 0) {
+ ret = consumer_add_channel(new_channel, ctx);
+ } else if (ret_recv < 0) {
goto end_nosignal;
}
} else {
- consumer_add_channel(new_channel, ctx);
+ ret = consumer_add_channel(new_channel, ctx);
+ }
+
+ /* If we received an error in add_channel, we need to report it. */
+ if (ret != 0) {
+ consumer_send_status_msg(sock, ret);
+ goto end_nosignal;
}
+
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_STREAM:
{
- int fd, stream_pipe;
+ int fd;
+ struct lttng_pipe *stream_pipe;
struct consumer_relayd_sock_pair *relayd = NULL;
struct lttng_consumer_stream *new_stream;
struct lttng_consumer_channel *channel;
new_stream->chan = channel;
new_stream->wait_fd = fd;
+ /* Metadata chan refcount is increment in add_metadata_stream */
+ if (new_stream->chan->type != CONSUMER_CHANNEL_TYPE_METADATA) {
+ /* Update channel refcount */
+ uatomic_inc(&new_stream->chan->refcount);
+ }
+
/*
* The buffer flush is done on the session daemon side for the kernel
* so no need for the stream "hangup_flush_done" variable to be
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_add_stream(&relayd->control_sock,
new_stream->name, new_stream->chan->pathname,
- &new_stream->relayd_stream_id);
+ &new_stream->relayd_stream_id,
+ new_stream->chan->tracefile_size,
+ new_stream->chan->tracefile_count);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
consumer_del_stream(new_stream, NULL);
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
- stream_pipe = ctx->consumer_metadata_pipe[1];
+ stream_pipe = ctx->consumer_metadata_pipe;
} else {
- stream_pipe = ctx->consumer_data_pipe[1];
+ stream_pipe = ctx->consumer_data_pipe;
}
- do {
- ret = write(stream_pipe, &new_stream, sizeof(new_stream));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
if (ret < 0) {
- PERROR("Consumer write %s stream to pipe %d",
+ ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
- stream_pipe);
+ lttng_pipe_get_writefd(stream_pipe));
consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}