ret = kernctl_snapshot(infd);
if (ret != 0) {
- errno = -ret;
perror("Getting sub-buffer snapshot.");
+ ret = -errno;
}
return ret;
ret = kernctl_snapshot_get_produced(infd, pos);
if (ret != 0) {
- errno = -ret;
perror("kernctl_snapshot_get_produced");
+ ret = -errno;
}
return ret;
}
+/*
+ * Receive command from session daemon and process it.
+ *
+ * Return 1 on success else a negative value or 0.
+ */
int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll)
{
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+ if (ret > 0) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+ ret = -1;
+ }
return ret;
}
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
- goto end_nosignal;
+ goto error_fatal;
}
DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
msg.u.channel.relayd_id, msg.u.channel.output,
msg.u.channel.tracefile_size,
- msg.u.channel.tracefile_count);
+ msg.u.channel.tracefile_count, 0);
if (new_channel == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
}
/* If we received an error in add_channel, we need to report it. */
- if (ret != 0) {
- consumer_send_status_msg(sock, ret);
+ if (ret < 0) {
+ ret = consumer_send_status_msg(sock, ret);
+ if (ret < 0) {
+ goto error_fatal;
+ }
goto end_nosignal;
}
/* First send a status message before receiving the fds. */
ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0 || ret_code != LTTNG_OK) {
+ if (ret < 0) {
+ /*
+ * Somehow, the session daemon is not responding
+ * anymore.
+ */
+ goto error_fatal;
+ }
+ if (ret_code != LTTNG_OK) {
/*
- * Somehow, the session daemon is not responding anymore or the
- * channel was not found.
+ * Channel was not found.
*/
goto end_nosignal;
}
new_stream->chan = channel;
new_stream->wait_fd = fd;
+ /*
+ * We've just assigned the channel to the stream so increment the
+ * refcount right now.
+ */
+ uatomic_inc(&new_stream->chan->refcount);
+
/*
* The buffer flush is done on the session daemon side for the kernel
* so no need for the stream "hangup_flush_done" variable to be
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_metadata_pipe;
} else {
+ ret = consumer_add_data_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
+ }
stream_pipe = ctx->consumer_data_pipe;
}
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
- consumer_del_stream(new_stream, NULL);
+ if (new_stream->metadata_flag) {
+ consumer_del_stream_for_metadata(new_stream);
+ } else {
+ consumer_del_stream_for_data(new_stream);
+ }
goto end_nosignal;
}
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
- goto end_nosignal;
+ goto error_fatal;
}
goto end_nosignal;
ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
if (ret < 0) {
PERROR("send data pending ret code");
+ goto error_fatal;
}
/*
* shutdown during the recv() or send() call.
*/
return 1;
+
+error_fatal:
+ rcu_read_unlock();
+ /* This will issue a consumer stop. */
+ return -1;
}
/*
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
- ret = err;
/*
* This is a debug message even for single-threaded consumer,
* because poll() have more relaxed criterions than get subbuf,
*/
DBG("Reserving sub buffer failed (everything is normal, "
"it is due to concurrency)");
+ ret = -errno;
goto end;
}
/* Get the full subbuffer size including padding */
err = kernctl_get_padded_subbuf_size(infd, &len);
if (err != 0) {
- errno = -err;
perror("Getting sub-buffer len failed.");
- ret = err;
+ ret = -errno;
goto end;
}
/* Get subbuffer size without padding */
err = kernctl_get_subbuf_size(infd, &subbuf_size);
if (err != 0) {
- errno = -err;
perror("Getting sub-buffer len failed.");
- ret = err;
+ ret = -errno;
goto end;
}
break;
default:
ERR("Unknown output method");
- ret = -1;
+ ret = -EPERM;
}
err = kernctl_put_next_subbuf(infd);
if (err != 0) {
- errno = -err;
if (errno == EFAULT) {
perror("Error in unreserving sub buffer\n");
} else if (errno == EIO) {
/* Should never happen with newer LTTng versions */
perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
-
- ret = -err;
+ ret = -errno;
goto end;
}
ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
if (ret != 0) {
- errno = -ret;
PERROR("kernctl_get_mmap_len");
+ ret = -errno;
goto error_close_fd;
}
stream->mmap_len = (size_t) mmap_len;
return 0;
error_close_fd:
- {
+ if (stream->out_fd >= 0) {
int err;
err = close(stream->out_fd);
assert(!err);
+ stream->out_fd = -1;
}
error:
return ret;
assert(stream);
+ if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+ ret = 0;
+ goto end;
+ }
+
ret = kernctl_get_next_subbuf(stream->wait_fd);
if (ret == 0) {
/* There is still data so let's put back this subbuffer. */