Cleanup: remove ignored flags from poll events bitmasks
[lttng-tools.git] / src / common / consumer / consumer.cpp
index ae8469ea233038b8478626e3da6fc70f0c014b59..f97c119705451c1f2080e99327de6488e9283062 100644 (file)
@@ -383,6 +383,12 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                consumer_timer_monitor_stop(channel);
        }
 
+       /*
+        * Send a last buffer statistics sample to the session daemon
+        * to ensure it tracks the amount of data consumed by this channel.
+        */
+       sample_and_send_channel_buffer_stats(channel);
+
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
@@ -2161,7 +2167,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        consumer_stream_delete(stream, ht);
 
        /* Close down everything including the relayd if one. */
-       consumer_stream_close(stream);
+       consumer_stream_close_output(stream);
        /* Destroy tracer buffers of the stream. */
        consumer_stream_destroy_buffers(stream);
 
@@ -2422,9 +2428,8 @@ restart:
                                                        stream->wait_fd);
 
                                        /* Add metadata stream to the global poll events list */
-                                       lttng_poll_add(&events, stream->wait_fd,
-                                                       LPOLLIN | LPOLLPRI | LPOLLHUP);
-                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI);
+                               }else if (revents & (LPOLLERR | LPOLLHUP)) {
                                        DBG("Metadata thread pipe hung up");
                                        /*
                                         * Remove the pipe from the poll set and continue the loop
@@ -2713,7 +2718,7 @@ void *consumer_thread_data_poll(void *data)
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                } else if (len > 0) {
-                                       local_stream[i]->data_read = 1;
+                                       local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                                }
                        }
                }
@@ -2744,7 +2749,7 @@ void *consumer_thread_data_poll(void *data)
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                } else if (len > 0) {
-                                       local_stream[i]->data_read = 1;
+                                       local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                                }
                        }
                }
@@ -2764,37 +2769,45 @@ void *consumer_thread_data_poll(void *data)
                                                pollfd[i].fd);
                                lttng_ustconsumer_on_stream_hangup(local_stream[i]);
                                /* Attempt read again, for the data we just flushed. */
-                               local_stream[i]->data_read = 1;
+                               local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
                        }
                        /*
+                        * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
+                        * performed. This type of flush ensures that a new packet is produced no
+                        * matter the consumed/produced positions are.
+                        *
+                        * This, in turn, causes the next pass to see that data available for the
+                        * stream. When we come back here, we can be assured that all available
+                        * data has been consumed and we can finally destroy the stream.
+                        *
                         * If the poll flag is HUP/ERR/NVAL and we have
                         * read no data in this pass, we can remove the
                         * stream from its hash table.
                         */
                        if ((pollfd[i].revents & POLLHUP)) {
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
-                               if (!local_stream[i]->data_read) {
+                               if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
                                        consumer_del_stream(local_stream[i], data_ht);
                                        local_stream[i] = NULL;
                                        num_hup++;
                                }
                        }
                        if (local_stream[i] != NULL) {
-                               local_stream[i]->data_read = 0;
+                               local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
                        }
                }
        }
@@ -3006,8 +3019,8 @@ restart:
                                                                &chan->wait_fd_node);
                                                rcu_read_unlock();
                                                /* Add channel to the global poll events list */
-                                               lttng_poll_add(&events, chan->wait_fd,
-                                                               LPOLLERR | LPOLLHUP);
+                                               // FIXME: Empty flag on a pipe pollset, this might hang on FreeBSD.
+                                               lttng_poll_add(&events, chan->wait_fd, 0);
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
This page took 0.025613 seconds and 4 git commands to generate.