Fix: sessiond: size-based rotation threshold exceeded in per-pid tracing (2/2)
[lttng-tools.git] / src / common / consumer / consumer.cpp
index 838990ccedefd7e24d693721140c0eb1924e3d22..3272c129fe353c2ad6b75f3f359391ce55ced9bf 100644 (file)
@@ -7,43 +7,42 @@
  *
  */
 
-#include "common/index/ctf-index.h"
-#include <stdint.h>
 #define _LGPL_SOURCE
+#include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
+#include <signal.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/mman.h>
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <unistd.h>
-#include <inttypes.h>
-#include <signal.h>
 
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/utils.h>
-#include <common/time.h>
-#include <common/compat/poll.h>
-#include <common/compat/endian.h>
-#include <common/index/index.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/kernel-consumer/kernel-consumer.h>
-#include <common/relayd/relayd.h>
-#include <common/ust-consumer/ust-consumer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/consumer/consumer-testpoint.h>
-#include <common/align.h>
-#include <common/consumer/consumer-metadata-cache.h>
-#include <common/trace-chunk.h>
-#include <common/trace-chunk-registry.h>
-#include <common/string-utils/format.h>
-#include <common/dynamic-array.h>
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <common/align.hpp>
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/poll.hpp>
+#include <common/consumer/consumer-metadata-cache.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-testpoint.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/dynamic-array.hpp>
+#include <common/index/ctf-index.hpp>
+#include <common/index/index.hpp>
+#include <common/kernel-consumer/kernel-consumer.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/string-utils/format.hpp>
+#include <common/time.hpp>
+#include <common/trace-chunk-registry.hpp>
+#include <common/trace-chunk.hpp>
+#include <common/ust-consumer/ust-consumer.hpp>
+#include <common/utils.hpp>
 
 lttng_consumer_global_data the_consumer_data;
 
@@ -53,12 +52,22 @@ enum consumer_channel_action {
        CONSUMER_CHANNEL_QUIT,
 };
 
+namespace {
 struct consumer_channel_msg {
        enum consumer_channel_action action;
        struct lttng_consumer_channel *chan;    /* add */
        uint64_t key;                           /* del */
 };
 
+/*
+ * Global hash table containing respectively metadata and data streams. The
+ * stream element in this ht should only be updated by the metadata poll thread
+ * for the metadata and the data poll thread for the data.
+ */
+struct lttng_ht *metadata_ht;
+struct lttng_ht *data_ht;
+} /* namespace */
+
 /* Flag used to temporarily pause data consumption from testpoints. */
 int data_consumption_paused;
 
@@ -70,14 +79,6 @@ int data_consumption_paused;
  */
 int consumer_quit;
 
-/*
- * Global hash table containing respectively metadata and data streams. The
- * stream element in this ht should only be updated by the metadata poll thread
- * for the metadata and the data poll thread for the data.
- */
-static struct lttng_ht *metadata_ht;
-static struct lttng_ht *data_ht;
-
 static const char *get_consumer_domain(void)
 {
        switch (the_consumer_data.type) {
@@ -174,7 +175,6 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
        /* Delete streams that might have been left in the stream list. */
        cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
                        send_node) {
-               cds_list_del(&stream->send_node);
                /*
                 * Once a stream is added to this list, the buffers were created so we
                 * have a guarantee that this call will succeed. Setting the monitor
@@ -209,7 +209,7 @@ static struct lttng_consumer_stream *find_stream(uint64_t key,
        lttng_ht_lookup(ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
-               stream = caa_container_of(node, struct lttng_consumer_stream, node);
+               stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
        }
 
        rcu_read_unlock();
@@ -257,7 +257,7 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
        lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
-               channel = caa_container_of(node, struct lttng_consumer_channel, node);
+               channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
        }
 
        return channel;
@@ -292,9 +292,9 @@ static void steal_channel_key(uint64_t key)
 static void free_channel_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_u64 *node =
-               caa_container_of(head, struct lttng_ht_node_u64, head);
+               lttng::utils::container_of(head, &lttng_ht_node_u64::head);
        struct lttng_consumer_channel *channel =
-               caa_container_of(node, struct lttng_consumer_channel, node);
+               lttng::utils::container_of(node, &lttng_consumer_channel::node);
 
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -316,9 +316,9 @@ static void free_channel_rcu(struct rcu_head *head)
 static void free_relayd_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_u64 *node =
-               caa_container_of(head, struct lttng_ht_node_u64, head);
+               lttng::utils::container_of(head, &lttng_ht_node_u64::head);
        struct consumer_relayd_sock_pair *relayd =
-               caa_container_of(node, struct consumer_relayd_sock_pair, node);
+               lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
 
        /*
         * Close all sockets. This is done in the call RCU since we don't want the
@@ -383,6 +383,12 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                consumer_timer_monitor_stop(channel);
        }
 
+       /*
+        * Send a last buffer statistics sample to the session daemon
+        * to ensure it tracks the amount of data consumed by this channel.
+        */
+       sample_and_send_channel_buffer_stats(channel);
+
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
@@ -664,7 +670,7 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
                goto error;
        }
 
-       obj = (consumer_relayd_sock_pair *) zmalloc(sizeof(struct consumer_relayd_sock_pair));
+       obj = zmalloc<consumer_relayd_sock_pair>();
        if (obj == NULL) {
                PERROR("zmalloc relayd sock");
                goto error;
@@ -705,7 +711,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
        lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
-               relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
+               relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
        }
 
 error:
@@ -1031,7 +1037,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                }
        }
 
-       channel = (lttng_consumer_channel *) zmalloc(sizeof(*channel));
+       channel = zmalloc<lttng_consumer_channel>();
        if (channel == NULL) {
                PERROR("malloc struct lttng_consumer_channel");
                goto end;
@@ -1427,7 +1433,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                        the_consumer_data.type == type);
        the_consumer_data.type = type;
 
-       ctx = (lttng_consumer_local_data *) zmalloc(sizeof(struct lttng_consumer_local_data));
+       ctx = zmalloc<lttng_consumer_local_data>();
        if (ctx == NULL) {
                PERROR("allocating context");
                goto error;
@@ -2563,7 +2569,7 @@ void *consumer_thread_data_poll(void *data)
 
        health_code_update();
 
-       local_stream = (lttng_consumer_stream **) zmalloc(sizeof(struct lttng_consumer_stream *));
+       local_stream = zmalloc<lttng_consumer_stream *>();
        if (local_stream == NULL) {
                PERROR("local_stream malloc");
                goto end;
@@ -2588,18 +2594,14 @@ void *consumer_thread_data_poll(void *data)
                        local_stream = NULL;
 
                        /* Allocate for all fds */
-                       pollfd = (struct pollfd *) zmalloc((the_consumer_data.stream_count +
-                                                        nb_pipes_fd) *
-                                       sizeof(struct pollfd));
+                       pollfd = calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&the_consumer_data.lock);
                                goto end;
                        }
 
-                       local_stream = (lttng_consumer_stream **) zmalloc((the_consumer_data.stream_count +
-                                                              nb_pipes_fd) *
-                                       sizeof(struct lttng_consumer_stream *));
+                       local_stream = calloc<lttng_consumer_stream *>(the_consumer_data.stream_count + nb_pipes_fd);
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
                                pthread_mutex_unlock(&the_consumer_data.lock);
@@ -2717,7 +2719,7 @@ void *consumer_thread_data_poll(void *data)
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                } else if (len > 0) {
-                                       local_stream[i]->data_read = 1;
+                                       local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                                }
                        }
                }
@@ -2748,7 +2750,7 @@ void *consumer_thread_data_poll(void *data)
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                } else if (len > 0) {
-                                       local_stream[i]->data_read = 1;
+                                       local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                                }
                        }
                }
@@ -2768,37 +2770,45 @@ void *consumer_thread_data_poll(void *data)
                                                pollfd[i].fd);
                                lttng_ustconsumer_on_stream_hangup(local_stream[i]);
                                /* Attempt read again, for the data we just flushed. */
-                               local_stream[i]->data_read = 1;
+                               local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                        }
                        /*
+                        * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
+                        * performed. This type of flush ensures that a new packet is produced no
+                        * matter the consumed/produced positions are.
+                        *
+                        * This, in turn, causes the next pass to see that data available for the
+                        * stream. When we come back here, we can be assured that all available
+                        * data has been consumed and we can finally destroy the stream.
+                        *
                         * If the poll flag is HUP/ERR/NVAL and we have
                         * read no data in this pass, we can remove the
                         * stream from its hash table.
                         */
                        if ((pollfd[i].revents & POLLHUP)) {
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        }
                        if (local_stream[i] != NULL) {
-                               local_stream[i]->data_read = 0;
+                               local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
                        }
                }
        }
@@ -3405,7 +3415,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
         */
        if (stream->rotate_ready) {
                DBG("Rotate stream before consuming data");
-               ret = lttng_consumer_rotate_stream(ctx, stream);
+               ret = lttng_consumer_rotate_stream(stream);
                if (ret < 0) {
                        ERR("Stream rotation error before consuming data");
                        goto end;
@@ -3461,7 +3471,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
         */
        rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
        if (rotation_ret == 1) {
-               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+               rotation_ret = lttng_consumer_rotate_stream(stream);
                if (rotation_ret < 0) {
                        ret = rotation_ret;
                        ERR("Stream rotation error after consuming data");
@@ -3987,8 +3997,7 @@ end:
  * Returns 0 on success, < 0 on error
  */
 int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
-               uint64_t key, uint64_t relayd_id, uint32_t metadata,
-               struct lttng_consumer_local_data *ctx)
+               uint64_t key, uint64_t relayd_id)
 {
        int ret;
        struct lttng_consumer_stream *stream;
@@ -4538,8 +4547,7 @@ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stre
  * Perform the rotation a local stream file.
  */
 static
-int rotate_local_stream(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
+int rotate_local_stream(struct lttng_consumer_stream *stream)
 {
        int ret = 0;
 
@@ -4578,8 +4586,7 @@ end:
  *
  * Return 0 on success, a negative number of error.
  */
-int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
+int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
 
@@ -4614,7 +4621,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
        }
 
        if (stream->net_seq_idx == (uint64_t) -1ULL) {
-               ret = rotate_local_stream(ctx, stream);
+               ret = rotate_local_stream(stream);
                if (ret < 0) {
                        ERR("Failed to rotate stream, ret = %i", ret);
                        goto error;
@@ -4658,7 +4665,7 @@ error:
  * Returns 0 on success, < 0 on error
  */
 int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
-               uint64_t key, struct lttng_consumer_local_data *ctx)
+               uint64_t key)
 {
        int ret;
        struct lttng_consumer_stream *stream;
@@ -4687,7 +4694,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
                }
                DBG("Consumer rotate ready stream %" PRIu64, stream->key);
 
-               ret = lttng_consumer_rotate_stream(ctx, stream);
+               ret = lttng_consumer_rotate_stream(stream);
                pthread_mutex_unlock(&stream->lock);
                pthread_mutex_unlock(&stream->chan->lock);
                if (ret) {
@@ -4704,7 +4711,7 @@ end:
 
 enum lttcomm_return_code lttng_consumer_init_command(
                struct lttng_consumer_local_data *ctx,
-               const lttng_uuid sessiond_uuid)
+               const lttng_uuid& sessiond_uuid)
 {
        enum lttcomm_return_code ret;
        char uuid_str[LTTNG_UUID_STR_LEN];
@@ -4715,7 +4722,7 @@ enum lttcomm_return_code lttng_consumer_init_command(
        }
 
        ctx->sessiond_uuid.is_set = true;
-       memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
+       ctx->sessiond_uuid.value = sessiond_uuid;
        ret = LTTCOMM_CONSUMERD_SUCCESS;
        lttng_uuid_to_str(sessiond_uuid, uuid_str);
        DBG("Received session daemon UUID: %s", uuid_str);
This page took 0.029304 seconds and 4 git commands to generate.