Clean-up: consumerd: reduce duplication of stream output close code
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
index 42242ad1231aed54a79ef30ab3b324e353809c6f..ce1d3d8293463898d09b1d9f12eeab018dbbbf9e 100644 (file)
 #include <sys/stat.h>
 #include <stdint.h>
 
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/compat/fcntl.h>
-#include <common/compat/endian.h>
-#include <common/pipe.h>
-#include <common/relayd/relayd.h>
-#include <common/utils.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/index/index.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/optional.h>
-#include <common/buffer-view.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/metadata-bucket.h>
-
-#include "kernel-consumer.h"
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <common/common.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/compat/fcntl.hpp>
+#include <common/compat/endian.hpp>
+#include <common/pipe.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/utils.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/index/index.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/optional.hpp>
+#include <common/buffer-view.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/consumer/metadata-bucket.hpp>
+
+#include "kernel-consumer.hpp"
 
 extern struct lttng_consumer_global_data the_consumer_data;
 extern int consumer_poll_timeout;
@@ -145,8 +145,7 @@ error:
 static int lttng_kconsumer_snapshot_channel(
                struct lttng_consumer_channel *channel,
                uint64_t key, char *path, uint64_t relayd_id,
-               uint64_t nb_packets_per_stream,
-               struct lttng_consumer_local_data *ctx)
+               uint64_t nb_packets_per_stream)
 {
        int ret;
        struct lttng_consumer_stream *stream;
@@ -199,13 +198,13 @@ static int lttng_kconsumer_snapshot_channel(
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                                ERR("sending stream to relayd");
-                               goto end_unlock;
+                               goto error_close_stream_output;
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream,
                                        false);
                        if (ret < 0) {
-                               goto end_unlock;
+                               goto error_close_stream_output;
                        }
                        DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
                                        stream->key);
@@ -223,7 +222,7 @@ static int lttng_kconsumer_snapshot_channel(
                        ret = kernctl_buffer_flush(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Failed to flush kernel stream");
-                               goto end_unlock;
+                               goto error_close_stream_output;
                        }
                        goto end_unlock;
                }
@@ -231,19 +230,19 @@ static int lttng_kconsumer_snapshot_channel(
                ret = lttng_kconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking kernel snapshot");
-                       goto end_unlock;
+                       goto error_close_stream_output;
                }
 
                ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced kernel snapshot position");
-                       goto end_unlock;
+                       goto error_close_stream_output;
                }
 
                ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd kernel snapshot position");
-                       goto end_unlock;
+                       goto error_close_stream_output;
                }
 
                consumed_pos = consumer_get_consume_start_pos(consumed_pos,
@@ -263,7 +262,7 @@ static int lttng_kconsumer_snapshot_channel(
                        if (ret < 0) {
                                if (ret != -EAGAIN) {
                                        PERROR("kernctl_get_subbuf snapshot");
-                                       goto end_unlock;
+                                       goto error_close_stream_output;
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
@@ -313,26 +312,12 @@ static int lttng_kconsumer_snapshot_channel(
                        ret = kernctl_put_subbuf(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_put_subbuf");
-                               goto end_unlock;
+                               goto error_close_stream_output;
                        }
                        consumed_pos += stream->max_sb_size;
                }
 
-               if (relayd_id == (uint64_t) -1ULL) {
-                       if (stream->out_fd >= 0) {
-                               ret = close(stream->out_fd);
-                               if (ret < 0) {
-                                       PERROR("Kernel consumer snapshot close out_fd");
-                                       goto end_unlock;
-                               }
-                               stream->out_fd = -1;
-                       }
-               } else {
-                       close_relayd_stream(stream);
-                       stream->net_seq_idx = (uint64_t) -1ULL;
-               }
-               lttng_trace_chunk_put(stream->trace_chunk);
-               stream->trace_chunk = NULL;
+               consumer_stream_close_output(stream);
                pthread_mutex_unlock(&stream->lock);
        }
 
@@ -345,6 +330,8 @@ error_put_subbuf:
        if (ret < 0) {
                ERR("Snapshot kernctl_put_subbuf error path");
        }
+error_close_stream_output:
+       consumer_stream_close_output(stream);
 end_unlock:
        pthread_mutex_unlock(&stream->lock);
 end:
@@ -989,8 +976,7 @@ error_streams_sent_nosignal:
                                                msg.u.snapshot_channel.pathname,
                                                msg.u.snapshot_channel.relayd_id,
                                                msg.u.snapshot_channel
-                                                               .nb_packets_per_stream,
-                                               ctx);
+                                                               .nb_packets_per_stream);
                                if (ret_snapshot < 0) {
                                        ERR("Snapshot channel failed");
                                        ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
@@ -1184,8 +1170,7 @@ end_destroy_channel:
 
                        ret_rotate_channel = lttng_consumer_rotate_channel(
                                        channel, key,
-                                       msg.u.rotate_channel.relayd_id,
-                                       msg.u.rotate_channel.metadata, ctx);
+                                       msg.u.rotate_channel.relayd_id);
                        if (ret_rotate_channel < 0) {
                                ERR("Rotate channel failed");
                                ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
@@ -1204,7 +1189,7 @@ end_destroy_channel:
                        int ret_rotate;
 
                        ret_rotate = lttng_consumer_rotate_ready_streams(
-                                       channel, key, ctx);
+                                       channel, key);
                        if (ret_rotate < 0) {
                                ERR("Rotate ready streams failed");
                        }
@@ -1247,9 +1232,13 @@ error_rotate_channel:
        case LTTNG_CONSUMER_INIT:
        {
                int ret_send_status;
+               lttng_uuid sessiond_uuid;
+
+               std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
+                               sessiond_uuid.begin());
 
                ret_code = lttng_consumer_init_command(ctx,
-                               msg.u.init.sessiond_uuid);
+                               sessiond_uuid);
                health_code_update();
                ret_send_status = consumer_send_status_msg(sock, ret_code);
                if (ret_send_status < 0) {
@@ -1734,7 +1723,7 @@ end:
 
 static
 int put_next_subbuffer(struct lttng_consumer_stream *stream,
-               struct stream_subbuffer *subbuffer)
+               struct stream_subbuffer *subbuffer __attribute__((unused)))
 {
        const int ret = kernctl_put_next_subbuf(stream->wait_fd);
 
@@ -1766,7 +1755,7 @@ bool is_get_next_check_metadata_available(int tracer_fd)
 
 static
 int signal_metadata(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
+               struct lttng_consumer_local_data *ctx __attribute__((unused)))
 {
        ASSERT_LOCKED(stream->metadata_rdv_lock);
        return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
This page took 0.02674 seconds and 4 git commands to generate.