X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=3272c129fe353c2ad6b75f3f359391ce55ced9bf;hb=319dcddc7409961e156af76666fe70d31baec55a;hp=0585b7d50ee149d28243697f46cca6dc427b9f4a;hpb=48b7cdc221a445188d6d9bd08fc1686837e71224;p=lttng-tools.git diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 0585b7d50..3272c129f 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -7,42 +7,42 @@ * */ -#include "common/index/ctf-index.h" #define _LGPL_SOURCE +#include #include #include +#include #include #include #include #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include lttng_consumer_global_data the_consumer_data; @@ -52,12 +52,22 @@ enum consumer_channel_action { CONSUMER_CHANNEL_QUIT, }; +namespace { struct consumer_channel_msg { enum consumer_channel_action action; struct lttng_consumer_channel *chan; /* add */ uint64_t key; /* del */ }; +/* + * Global hash table containing respectively metadata and data streams. The + * stream element in this ht should only be updated by the metadata poll thread + * for the metadata and the data poll thread for the data. + */ +struct lttng_ht *metadata_ht; +struct lttng_ht *data_ht; +} /* namespace */ + /* Flag used to temporarily pause data consumption from testpoints. */ int data_consumption_paused; @@ -69,14 +79,6 @@ int data_consumption_paused; */ int consumer_quit; -/* - * Global hash table containing respectively metadata and data streams. The - * stream element in this ht should only be updated by the metadata poll thread - * for the metadata and the data poll thread for the data. - */ -static struct lttng_ht *metadata_ht; -static struct lttng_ht *data_ht; - static const char *get_consumer_domain(void) { switch (the_consumer_data.type) { @@ -173,7 +175,6 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel) /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, send_node) { - cds_list_del(&stream->send_node); /* * Once a stream is added to this list, the buffers were created so we * have a guarantee that this call will succeed. Setting the monitor @@ -208,7 +209,7 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, lttng_ht_lookup(ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { - stream = caa_container_of(node, struct lttng_consumer_stream, node); + stream = lttng::utils::container_of(node, <tng_consumer_stream::node); } rcu_read_unlock(); @@ -256,7 +257,7 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key) lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { - channel = caa_container_of(node, struct lttng_consumer_channel, node); + channel = lttng::utils::container_of(node, <tng_consumer_channel::node); } return channel; @@ -291,9 +292,9 @@ static void steal_channel_key(uint64_t key) static void free_channel_rcu(struct rcu_head *head) { struct lttng_ht_node_u64 *node = - caa_container_of(head, struct lttng_ht_node_u64, head); + lttng::utils::container_of(head, <tng_ht_node_u64::head); struct lttng_consumer_channel *channel = - caa_container_of(node, struct lttng_consumer_channel, node); + lttng::utils::container_of(node, <tng_consumer_channel::node); switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -315,9 +316,9 @@ static void free_channel_rcu(struct rcu_head *head) static void free_relayd_rcu(struct rcu_head *head) { struct lttng_ht_node_u64 *node = - caa_container_of(head, struct lttng_ht_node_u64, head); + lttng::utils::container_of(head, <tng_ht_node_u64::head); struct consumer_relayd_sock_pair *relayd = - caa_container_of(node, struct consumer_relayd_sock_pair, node); + lttng::utils::container_of(node, &consumer_relayd_sock_pair::node); /* * Close all sockets. This is done in the call RCU since we don't want the @@ -382,6 +383,12 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) consumer_timer_monitor_stop(channel); } + /* + * Send a last buffer statistics sample to the session daemon + * to ensure it tracks the amount of data consumed by this channel. + */ + sample_and_send_channel_buffer_stats(channel); + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; @@ -663,7 +670,7 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( goto error; } - obj = (consumer_relayd_sock_pair *) zmalloc(sizeof(struct consumer_relayd_sock_pair)); + obj = zmalloc(); if (obj == NULL) { PERROR("zmalloc relayd sock"); goto error; @@ -704,7 +711,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key) lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { - relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node); + relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node); } error: @@ -1030,7 +1037,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, } } - channel = (lttng_consumer_channel *) zmalloc(sizeof(*channel)); + channel = zmalloc(); if (channel == NULL) { PERROR("malloc struct lttng_consumer_channel"); goto end; @@ -1426,7 +1433,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( the_consumer_data.type == type); the_consumer_data.type = type; - ctx = (lttng_consumer_local_data *) zmalloc(sizeof(struct lttng_consumer_local_data)); + ctx = zmalloc(); if (ctx == NULL) { PERROR("allocating context"); goto error; @@ -2562,7 +2569,7 @@ void *consumer_thread_data_poll(void *data) health_code_update(); - local_stream = (lttng_consumer_stream **) zmalloc(sizeof(struct lttng_consumer_stream *)); + local_stream = zmalloc(); if (local_stream == NULL) { PERROR("local_stream malloc"); goto end; @@ -2587,18 +2594,14 @@ void *consumer_thread_data_poll(void *data) local_stream = NULL; /* Allocate for all fds */ - pollfd = (struct pollfd *) zmalloc((the_consumer_data.stream_count + - nb_pipes_fd) * - sizeof(struct pollfd)); + pollfd = calloc(the_consumer_data.stream_count + nb_pipes_fd); if (pollfd == NULL) { PERROR("pollfd malloc"); pthread_mutex_unlock(&the_consumer_data.lock); goto end; } - local_stream = (lttng_consumer_stream **) zmalloc((the_consumer_data.stream_count + - nb_pipes_fd) * - sizeof(struct lttng_consumer_stream *)); + local_stream = calloc(the_consumer_data.stream_count + nb_pipes_fd); if (local_stream == NULL) { PERROR("local_stream malloc"); pthread_mutex_unlock(&the_consumer_data.lock); @@ -2716,7 +2719,7 @@ void *consumer_thread_data_poll(void *data) consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; } else if (len > 0) { - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } } } @@ -2747,7 +2750,7 @@ void *consumer_thread_data_poll(void *data) consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; } else if (len > 0) { - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } } } @@ -2767,37 +2770,45 @@ void *consumer_thread_data_poll(void *data) pollfd[i].fd); lttng_ustconsumer_on_stream_hangup(local_stream[i]); /* Attempt read again, for the data we just flushed. */ - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } /* + * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is + * performed. This type of flush ensures that a new packet is produced no + * matter the consumed/produced positions are. + * + * This, in turn, causes the next pass to see that data available for the + * stream. When we come back here, we can be assured that all available + * data has been consumed and we can finally destroy the stream. + * * If the poll flag is HUP/ERR/NVAL and we have * read no data in this pass, we can remove the * stream from its hash table. */ if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } if (local_stream[i] != NULL) { - local_stream[i]->data_read = 0; + local_stream[i]->has_data_left_to_be_read_before_teardown = 0; } } } @@ -3404,7 +3415,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, */ if (stream->rotate_ready) { DBG("Rotate stream before consuming data"); - ret = lttng_consumer_rotate_stream(ctx, stream); + ret = lttng_consumer_rotate_stream(stream); if (ret < 0) { ERR("Stream rotation error before consuming data"); goto end; @@ -3460,7 +3471,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, */ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); + rotation_ret = lttng_consumer_rotate_stream(stream); if (rotation_ret < 0) { ret = rotation_ret; ERR("Stream rotation error after consuming data"); @@ -3564,18 +3575,23 @@ error: * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ - void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, - struct lttng_consumer_local_data *ctx, int sock, +void consumer_add_relayd_socket(uint64_t net_seq_idx, + int sock_type, + struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll, - struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, - uint64_t relayd_session_id) + uint64_t sessiond_id, + uint64_t relayd_session_id, + uint32_t relayd_version_major, + uint32_t relayd_version_minor, + enum lttcomm_sock_proto relayd_socket_protocol) { int fd = -1, ret = -1, relayd_created = 0; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct consumer_relayd_sock_pair *relayd = NULL; LTTNG_ASSERT(ctx); - LTTNG_ASSERT(relayd_sock); + LTTNG_ASSERT(sock >= 0); ASSERT_RCU_READ_LOCKED(); DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx); @@ -3645,54 +3661,25 @@ error: switch (sock_type) { case LTTNG_STREAM_CONTROL: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock); - ret = lttcomm_create_sock(&relayd->control_sock.sock); - /* Handle create_sock error. */ - if (ret < 0) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; - goto error; - } - /* - * Close the socket created internally by - * lttcomm_create_sock, so we can replace it by the one - * received from sessiond. - */ - if (close(relayd->control_sock.sock.fd)) { - PERROR("close"); - } + ret = lttcomm_populate_sock_from_open_socket( + &relayd->control_sock.sock, fd, + relayd_socket_protocol); - /* Assign new file descriptor */ - relayd->control_sock.sock.fd = fd; /* Assign version values. */ - relayd->control_sock.major = relayd_sock->major; - relayd->control_sock.minor = relayd_sock->minor; + relayd->control_sock.major = relayd_version_major; + relayd->control_sock.minor = relayd_version_minor; relayd->relayd_session_id = relayd_session_id; break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock); - ret = lttcomm_create_sock(&relayd->data_sock.sock); - /* Handle create_sock error. */ - if (ret < 0) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; - goto error; - } - /* - * Close the socket created internally by - * lttcomm_create_sock, so we can replace it by the one - * received from sessiond. - */ - if (close(relayd->data_sock.sock.fd)) { - PERROR("close"); - } - - /* Assign new file descriptor */ - relayd->data_sock.sock.fd = fd; + ret = lttcomm_populate_sock_from_open_socket( + &relayd->data_sock.sock, fd, + relayd_socket_protocol); /* Assign version values. */ - relayd->data_sock.major = relayd_sock->major; - relayd->data_sock.minor = relayd_sock->minor; + relayd->data_sock.major = relayd_version_major; + relayd->data_sock.minor = relayd_version_minor; break; default: ERR("Unknown relayd socket type (%d)", sock_type); @@ -3700,6 +3687,11 @@ error: goto error; } + if (ret < 0) { + ret_code = LTTCOMM_CONSUMERD_FATAL; + goto error; + } + DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)", sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", relayd->net_seq_idx, fd); @@ -4005,8 +3997,7 @@ end: * Returns 0 on success, < 0 on error */ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, - uint64_t key, uint64_t relayd_id, uint32_t metadata, - struct lttng_consumer_local_data *ctx) + uint64_t key, uint64_t relayd_id) { int ret; struct lttng_consumer_stream *stream; @@ -4556,8 +4547,7 @@ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stre * Perform the rotation a local stream file. */ static -int rotate_local_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) +int rotate_local_stream(struct lttng_consumer_stream *stream) { int ret = 0; @@ -4596,8 +4586,7 @@ end: * * Return 0 on success, a negative number of error. */ -int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) +int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream) { int ret; @@ -4632,7 +4621,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, } if (stream->net_seq_idx == (uint64_t) -1ULL) { - ret = rotate_local_stream(ctx, stream); + ret = rotate_local_stream(stream); if (ret < 0) { ERR("Failed to rotate stream, ret = %i", ret); goto error; @@ -4676,7 +4665,7 @@ error: * Returns 0 on success, < 0 on error */ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, - uint64_t key, struct lttng_consumer_local_data *ctx) + uint64_t key) { int ret; struct lttng_consumer_stream *stream; @@ -4705,7 +4694,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, } DBG("Consumer rotate ready stream %" PRIu64, stream->key); - ret = lttng_consumer_rotate_stream(ctx, stream); + ret = lttng_consumer_rotate_stream(stream); pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); if (ret) { @@ -4722,7 +4711,7 @@ end: enum lttcomm_return_code lttng_consumer_init_command( struct lttng_consumer_local_data *ctx, - const lttng_uuid sessiond_uuid) + const lttng_uuid& sessiond_uuid) { enum lttcomm_return_code ret; char uuid_str[LTTNG_UUID_STR_LEN]; @@ -4733,7 +4722,7 @@ enum lttcomm_return_code lttng_consumer_init_command( } ctx->sessiond_uuid.is_set = true; - memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid)); + ctx->sessiond_uuid.value = sessiond_uuid; ret = LTTCOMM_CONSUMERD_SUCCESS; lttng_uuid_to_str(sessiond_uuid, uuid_str); DBG("Received session daemon UUID: %s", uuid_str);