X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=3272c129fe353c2ad6b75f3f359391ce55ced9bf;hb=319dcddc7409961e156af76666fe70d31baec55a;hp=a5691b1c27df6deec93703bdc38d19121d3e237b;hpb=328c2fe7297c941aa9cbcfa4ce944fca1bd7300f;p=lttng-tools.git diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index a5691b1c2..3272c129f 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -209,7 +209,7 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, lttng_ht_lookup(ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { - stream = caa_container_of(node, struct lttng_consumer_stream, node); + stream = lttng::utils::container_of(node, <tng_consumer_stream::node); } rcu_read_unlock(); @@ -257,7 +257,7 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key) lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { - channel = caa_container_of(node, struct lttng_consumer_channel, node); + channel = lttng::utils::container_of(node, <tng_consumer_channel::node); } return channel; @@ -292,9 +292,9 @@ static void steal_channel_key(uint64_t key) static void free_channel_rcu(struct rcu_head *head) { struct lttng_ht_node_u64 *node = - caa_container_of(head, struct lttng_ht_node_u64, head); + lttng::utils::container_of(head, <tng_ht_node_u64::head); struct lttng_consumer_channel *channel = - caa_container_of(node, struct lttng_consumer_channel, node); + lttng::utils::container_of(node, <tng_consumer_channel::node); switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -316,9 +316,9 @@ static void free_channel_rcu(struct rcu_head *head) static void free_relayd_rcu(struct rcu_head *head) { struct lttng_ht_node_u64 *node = - caa_container_of(head, struct lttng_ht_node_u64, head); + lttng::utils::container_of(head, <tng_ht_node_u64::head); struct consumer_relayd_sock_pair *relayd = - caa_container_of(node, struct consumer_relayd_sock_pair, node); + lttng::utils::container_of(node, &consumer_relayd_sock_pair::node); /* * Close all sockets. This is done in the call RCU since we don't want the @@ -383,6 +383,12 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) consumer_timer_monitor_stop(channel); } + /* + * Send a last buffer statistics sample to the session daemon + * to ensure it tracks the amount of data consumed by this channel. + */ + sample_and_send_channel_buffer_stats(channel); + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; @@ -705,7 +711,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key) lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { - relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node); + relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node); } error: @@ -2713,7 +2719,7 @@ void *consumer_thread_data_poll(void *data) consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; } else if (len > 0) { - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } } } @@ -2744,7 +2750,7 @@ void *consumer_thread_data_poll(void *data) consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; } else if (len > 0) { - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } } } @@ -2764,37 +2770,45 @@ void *consumer_thread_data_poll(void *data) pollfd[i].fd); lttng_ustconsumer_on_stream_hangup(local_stream[i]); /* Attempt read again, for the data we just flushed. */ - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } /* + * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is + * performed. This type of flush ensures that a new packet is produced no + * matter the consumed/produced positions are. + * + * This, in turn, causes the next pass to see that data available for the + * stream. When we come back here, we can be assured that all available + * data has been consumed and we can finally destroy the stream. + * * If the poll flag is HUP/ERR/NVAL and we have * read no data in this pass, we can remove the * stream from its hash table. */ if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } if (local_stream[i] != NULL) { - local_stream[i]->data_read = 0; + local_stream[i]->has_data_left_to_be_read_before_teardown = 0; } } }