X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=f97c119705451c1f2080e99327de6488e9283062;hb=1524f98c04431d04e50796f83a9dd29184b3a8a4;hp=ae8469ea233038b8478626e3da6fc70f0c014b59;hpb=0114db0ec2407029052eb61a0189c9b1cd64d520;p=lttng-tools.git diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index ae8469ea2..f97c11970 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -383,6 +383,12 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) consumer_timer_monitor_stop(channel); } + /* + * Send a last buffer statistics sample to the session daemon + * to ensure it tracks the amount of data consumed by this channel. + */ + sample_and_send_channel_buffer_stats(channel); + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; @@ -2161,7 +2167,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, consumer_stream_delete(stream, ht); /* Close down everything including the relayd if one. */ - consumer_stream_close(stream); + consumer_stream_close_output(stream); /* Destroy tracer buffers of the stream. */ consumer_stream_destroy_buffers(stream); @@ -2422,9 +2428,8 @@ restart: stream->wait_fd); /* Add metadata stream to the global poll events list */ - lttng_poll_add(&events, stream->wait_fd, - LPOLLIN | LPOLLPRI | LPOLLHUP); - } else if (revents & (LPOLLERR | LPOLLHUP)) { + lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI); + }else if (revents & (LPOLLERR | LPOLLHUP)) { DBG("Metadata thread pipe hung up"); /* * Remove the pipe from the poll set and continue the loop @@ -2713,7 +2718,7 @@ void *consumer_thread_data_poll(void *data) consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; } else if (len > 0) { - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } } } @@ -2744,7 +2749,7 @@ void *consumer_thread_data_poll(void *data) consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; } else if (len > 0) { - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } } } @@ -2764,37 +2769,45 @@ void *consumer_thread_data_poll(void *data) pollfd[i].fd); lttng_ustconsumer_on_stream_hangup(local_stream[i]); /* Attempt read again, for the data we just flushed. */ - local_stream[i]->data_read = 1; + local_stream[i]->has_data_left_to_be_read_before_teardown = 1; } /* + * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is + * performed. This type of flush ensures that a new packet is produced no + * matter the consumed/produced positions are. + * + * This, in turn, causes the next pass to see that data available for the + * stream. When we come back here, we can be assured that all available + * data has been consumed and we can finally destroy the stream. + * * If the poll flag is HUP/ERR/NVAL and we have * read no data in this pass, we can remove the * stream from its hash table. */ if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - if (!local_stream[i]->data_read) { + if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = NULL; num_hup++; } } if (local_stream[i] != NULL) { - local_stream[i]->data_read = 0; + local_stream[i]->has_data_left_to_be_read_before_teardown = 0; } } } @@ -3006,8 +3019,8 @@ restart: &chan->wait_fd_node); rcu_read_unlock(); /* Add channel to the global poll events list */ - lttng_poll_add(&events, chan->wait_fd, - LPOLLERR | LPOLLHUP); + // FIXME: Empty flag on a pipe pollset, this might hang on FreeBSD. + lttng_poll_add(&events, chan->wait_fd, 0); break; case CONSUMER_CHANNEL_DEL: {