Fix: kernel ctl error codes are based on errno
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index f3a3a22b55a230a427bad1fe7c7cb844de236d8a..ea6237959c107c6db70a4f47c66fc3b08837b8d8 100644 (file)
@@ -56,8 +56,8 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
 
        ret = kernctl_snapshot(infd);
        if (ret != 0) {
-               errno = -ret;
                perror("Getting sub-buffer snapshot.");
+               ret = -errno;
        }
 
        return ret;
@@ -76,13 +76,18 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
 
        ret = kernctl_snapshot_get_produced(infd, pos);
        if (ret != 0) {
-               errno = -ret;
                perror("kernctl_snapshot_get_produced");
+               ret = -errno;
        }
 
        return ret;
 }
 
+/*
+ * Receive command from session daemon and process it.
+ *
+ * Return 1 on success else a negative value or 0.
+ */
 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
@@ -92,7 +97,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
-               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+               if (ret > 0) {
+                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+                       ret = -1;
+               }
                return ret;
        }
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
@@ -128,7 +136,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
-                       goto end_nosignal;
+                       goto error_fatal;
                }
                DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
@@ -136,7 +144,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
                                msg.u.channel.relayd_id, msg.u.channel.output,
                                msg.u.channel.tracefile_size,
-                               msg.u.channel.tracefile_count);
+                               msg.u.channel.tracefile_count, 0);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
@@ -166,8 +174,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
 
                /* If we received an error in add_channel, we need to report it. */
-               if (ret != 0) {
-                       consumer_send_status_msg(sock, ret);
+               if (ret < 0) {
+                       ret = consumer_send_status_msg(sock, ret);
+                       if (ret < 0) {
+                               goto error_fatal;
+                       }
                        goto end_nosignal;
                }
 
@@ -198,10 +209,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
-               if (ret < 0 || ret_code != LTTNG_OK) {
+               if (ret < 0) {
+                       /*
+                        * Somehow, the session daemon is not responding
+                        * anymore.
+                        */
+                       goto error_fatal;
+               }
+               if (ret_code != LTTNG_OK) {
                        /*
-                        * Somehow, the session daemon is not responding anymore or the
-                        * channel was not found.
+                        * Channel was not found.
                         */
                        goto end_nosignal;
                }
@@ -255,6 +272,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                new_stream->chan = channel;
                new_stream->wait_fd = fd;
 
+               /*
+                * We've just assigned the channel to the stream so increment the
+                * refcount right now.
+                */
+               uatomic_inc(&new_stream->chan->refcount);
+
                /*
                 * The buffer flush is done on the session daemon side for the kernel
                 * so no need for the stream "hangup_flush_done" variable to be
@@ -296,8 +319,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
+                       ret = consumer_add_metadata_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_metadata_pipe;
                } else {
+                       ret = consumer_add_data_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_data_pipe;
                }
 
@@ -306,7 +343,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ERR("Consumer write %s stream to pipe %d",
                                        new_stream->metadata_flag ? "metadata" : "data",
                                        lttng_pipe_get_writefd(stream_pipe));
-                       consumer_del_stream(new_stream, NULL);
+                       if (new_stream->metadata_flag) {
+                               consumer_del_stream_for_metadata(new_stream);
+                       } else {
+                               consumer_del_stream_for_data(new_stream);
+                       }
                        goto end_nosignal;
                }
 
@@ -350,7 +391,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
-                       goto end_nosignal;
+                       goto error_fatal;
                }
 
                goto end_nosignal;
@@ -368,6 +409,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
                if (ret < 0) {
                        PERROR("send data pending ret code");
+                       goto error_fatal;
                }
 
                /*
@@ -388,6 +430,11 @@ end_nosignal:
         * shutdown during the recv() or send() call.
         */
        return 1;
+
+error_fatal:
+       rcu_read_unlock();
+       /* This will issue a consumer stop. */
+       return -1;
 }
 
 /*
@@ -405,7 +452,6 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
        if (err != 0) {
-               ret = err;
                /*
                 * This is a debug message even for single-threaded consumer,
                 * because poll() have more relaxed criterions than get subbuf,
@@ -414,15 +460,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                 */
                DBG("Reserving sub buffer failed (everything is normal, "
                                "it is due to concurrency)");
+               ret = -errno;
                goto end;
        }
 
        /* Get the full subbuffer size including padding */
        err = kernctl_get_padded_subbuf_size(infd, &len);
        if (err != 0) {
-               errno = -err;
                perror("Getting sub-buffer len failed.");
-               ret = err;
+               ret = -errno;
                goto end;
        }
 
@@ -457,9 +503,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                /* Get subbuffer size without padding */
                err = kernctl_get_subbuf_size(infd, &subbuf_size);
                if (err != 0) {
-                       errno = -err;
                        perror("Getting sub-buffer len failed.");
-                       ret = err;
+                       ret = -errno;
                        goto end;
                }
 
@@ -489,20 +534,18 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                break;
        default:
                ERR("Unknown output method");
-               ret = -1;
+               ret = -EPERM;
        }
 
        err = kernctl_put_next_subbuf(infd);
        if (err != 0) {
-               errno = -err;
                if (errno == EFAULT) {
                        perror("Error in unreserving sub buffer\n");
                } else if (errno == EIO) {
                        /* Should never happen with newer LTTng versions */
                        perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
                }
-
-               ret = -err;
+               ret = -errno;
                goto end;
        }
 
@@ -534,8 +577,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 
                ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
                if (ret != 0) {
-                       errno = -ret;
                        PERROR("kernctl_get_mmap_len");
+                       ret = -errno;
                        goto error_close_fd;
                }
                stream->mmap_len = (size_t) mmap_len;
@@ -553,11 +596,12 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        return 0;
 
 error_close_fd:
-       {
+       if (stream->out_fd >= 0) {
                int err;
 
                err = close(stream->out_fd);
                assert(!err);
+               stream->out_fd = -1;
        }
 error:
        return ret;
@@ -577,6 +621,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        assert(stream);
 
+       if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+               ret = 0;
+               goto end;
+       }
+
        ret = kernctl_get_next_subbuf(stream->wait_fd);
        if (ret == 0) {
                /* There is still data so let's put back this subbuffer. */
This page took 0.028175 seconds and 4 git commands to generate.