Fix: LPOLLHUP and LPOLLERR when there is still data in pipe/socket
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 9 Sep 2015 15:56:33 +0000 (11:56 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 24 Sep 2015 04:35:35 +0000 (00:35 -0400)
The event mask returned by poll/epoll is a bitwise mask made of all the
events observed. On bidirectional sockets, there are cases where
combinations of LPOLLHUP/LPOLLERR and LPOLLIN/LPOLLPRI can be raised at
the same time.

Currently the overall behavior in sessiond, consumerd and relayd is to
handle LPOLLHUP or LPOLLERR immediately, whether or not there is still
data to read in the socket. Unfortunately, this behavior may discard the
last information made available on the pipe or socket.

Audit all uses of LPOLLHUP and LPOLLERR on sockets on which we expect
data to ensure that we deal with LPOLLIN or LPOLLPRI, and catch the
hangup when read or recvmsg returns 0. Keep the LPOLLHUP and LPOLLERR
handling, but only when LPOLLIN is not raised, just in case some
unforeseen error happens when sending the reply.

This is one correct case where we can handle LPOLLHUP and LPOLLERR
directly without caring about LPOLLIN: sockets where we are expected to
write and then read the reply (e.g. command sockets). It is then OK
for a dedicated thread to watch for LPOLLHUP and LPOLLERR.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-consumerd/health-consumerd.c
src/bin/lttng-relayd/health-relayd.c
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/main.c
src/bin/lttng-sessiond/agent-thread.c
src/bin/lttng-sessiond/ht-cleanup.c
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/ust-thread.c
src/common/consumer.c

index e8afd963f7c058ee1cb258cc7db6def5ea48d76f..93737c5d311a01ff07bae1e0c78e29a1a871c551 100644 (file)
@@ -274,7 +274,8 @@ restart:
 
                        /* Event on the registration socket */
                        if (pollfd == sock) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+                                               && !(revents & LPOLLIN)) {
                                        ERR("Health socket poll error");
                                        goto error;
                                }
index 1feaaeb374220f30ba1a9fb23ea4d848863817f7..0deb3740acaa203a693c134977a3bb883d637886 100644 (file)
@@ -344,9 +344,14 @@ restart:
 
                        /* Event on the registration socket */
                        if (pollfd == sock) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               if (revents & LPOLLIN) {
+                                       continue;
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                        ERR("Health socket poll error");
                                        goto error;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        }
                }
index f8d8ec9758d0cee76c902869ad823cb6f0c10f91..8a74853999c18d93ce1df7356942fa3b794c70d7 100644 (file)
@@ -541,10 +541,7 @@ restart:
                                goto exit;
                        }
 
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("socket poll error");
-                               goto error;
-                       } else if (revents & LPOLLIN) {
+                       if (revents & LPOLLIN) {
                                /*
                                 * A new connection is requested, therefore a
                                 * viewer connection is allocated in this
@@ -587,6 +584,12 @@ restart:
                                 * exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&viewer_conn_queue.futex);
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("socket poll error");
+                               goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
@@ -1907,10 +1910,7 @@ restart:
 
                        /* Inspect the relay conn pipe for new connection. */
                        if (pollfd == live_conn_pipe[0]) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       ERR("Relay live pipe error");
-                                       goto error;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        struct relay_connection *conn;
 
                                        ret = lttng_read(live_conn_pipe[0],
@@ -1922,6 +1922,12 @@ restart:
                                                        LPOLLIN | LPOLLRDHUP);
                                        connection_ht_add(viewer_connections_ht, conn);
                                        DBG("Connection socket %d added to poll", conn->sock->fd);
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("Relay live pipe error");
+                                       goto error;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        } else {
                                /* Connection activity. */
@@ -1932,11 +1938,7 @@ restart:
                                        continue;
                                }
 
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       cleanup_connection_pollfd(&events, pollfd);
-                                       /* Put "create" ownership reference. */
-                                       connection_put(conn);
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
                                                        sizeof(recv_hdr), 0);
                                        if (ret <= 0) {
@@ -1955,6 +1957,14 @@ restart:
                                                        DBG("Viewer connection closed with %d", pollfd);
                                                }
                                        }
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       cleanup_connection_pollfd(&events, pollfd);
+                                       /* Put "create" ownership reference. */
+                                       connection_put(conn);
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       connection_put(conn);
+                                       goto error;
                                }
                                /* Put local "get_by_sock" reference. */
                                connection_put(conn);
index 936a259b0d1385aca70b8a442ce8acbe3d4bd6af..30c7bd1675525e56e213b089d3a0ce2ed5eee0dc 100644 (file)
@@ -832,10 +832,7 @@ restart:
                                goto exit;
                        }
 
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("socket poll error");
-                               goto error;
-                       } else if (revents & LPOLLIN) {
+                       if (revents & LPOLLIN) {
                                /*
                                 * A new connection is requested, therefore a
                                 * sessiond/consumerd connection is allocated in
@@ -887,6 +884,12 @@ restart:
                                 * exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&relay_conn_queue.futex);
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("socket poll error");
+                               goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
index de5fd8de8a80cf851f291a0b5c2ed7fabf1825b6..31e3e8d3c056736065b5327cd887b26edae3083d 100644 (file)
@@ -295,35 +295,22 @@ restart:
                                goto exit;
                        }
 
-                       /*
-                        * Check first if this is a POLLERR since POLLIN is also included
-                        * in an error value thus checking first.
-                        */
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               /* Removing from the poll set */
-                               ret = lttng_poll_del(&events, pollfd);
-                               if (ret < 0) {
-                                       goto error;
-                               }
-
-                               agent_destroy_app_by_sock(pollfd);
-                       } else if (revents & (LPOLLIN)) {
+                       if (revents & LPOLLIN) {
                                int new_fd;
                                struct agent_app *app = NULL;
 
-                               /* Pollin event of agent app socket should NEVER happen. */
                                assert(pollfd == reg_sock->fd);
-
                                new_fd = handle_registration(reg_sock, &app);
                                if (new_fd < 0) {
-                                       WARN("[agent-thread] agent registration failed. Ignoring.");
-                                       /* Somehow the communication failed. Just continue. */
                                        continue;
                                }
                                /* Should not have a NULL app on success. */
                                assert(app);
 
-                               /* Only add poll error event to only detect shutdown. */
+                               /*
+                                * Since this is a command socket (write then read),
+                                * only add poll error event to only detect shutdown.
+                                */
                                ret = lttng_poll_add(&events, new_fd,
                                                LPOLLERR | LPOLLHUP | LPOLLRDHUP);
                                if (ret < 0) {
@@ -335,10 +322,26 @@ restart:
                                update_agent_app(app);
 
                                /* On failure, the poll will detect it and clean it up. */
-                               (void) agent_send_registration_done(app);
+                               ret = agent_send_registration_done(app);
+                               if (ret < 0) {
+                                       /* Removing from the poll set */
+                                       ret = lttng_poll_del(&events, new_fd);
+                                       if (ret < 0) {
+                                               goto error;
+                                       }
+                                       agent_destroy_app_by_sock(new_fd);
+                                       continue;
+                               }
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               /* Removing from the poll set */
+                               ret = lttng_poll_del(&events, pollfd);
+                               if (ret < 0) {
+                                       goto error;
+                               }
+                               agent_destroy_app_by_sock(pollfd);
                        } else {
-                               ERR("Unknown poll events %u for sock %d", revents, pollfd);
-                               continue;
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
index 57d6aec118bc6f5ac1df1df3057d63077b82b315..c477e1a649624fd38d9b82e4ec66ccc970689d28 100644 (file)
@@ -106,32 +106,31 @@ restart:
                        }
                        assert(pollfd == ht_cleanup_pipe[0]);
 
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                       if (revents & LPOLLIN) {
+                               /* Get socket from dispatch thread. */
+                               size_ret = lttng_read(ht_cleanup_pipe[0], &ht,
+                                               sizeof(ht));
+                               if (size_ret < sizeof(ht)) {
+                                       PERROR("ht cleanup notify pipe");
+                                       goto error;
+                               }
+                               health_code_update();
+                               /*
+                                * The whole point of this thread is to call
+                                * lttng_ht_destroy from a context that is NOT:
+                                * 1) a read-side RCU lock,
+                                * 2) a call_rcu thread.
+                                */
+                               lttng_ht_destroy(ht);
+
+                               health_code_update();
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                ERR("ht cleanup pipe error");
                                goto error;
-                       } else if (!(revents & LPOLLIN)) {
-                               /* No POLLIN and not a catched error, stop the thread. */
-                               ERR("ht cleanup failed. revent: %u", revents);
-                               goto error;
-                       }
-
-                       /* Get socket from dispatch thread. */
-                       size_ret = lttng_read(ht_cleanup_pipe[0], &ht,
-                                       sizeof(ht));
-                       if (size_ret < sizeof(ht)) {
-                               PERROR("ht cleanup notify pipe");
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
                                goto error;
                        }
-                       health_code_update();
-                       /*
-                        * The whole point of this thread is to call
-                        * lttng_ht_destroy from a context that is NOT:
-                        * 1) a read-side RCU lock,
-                        * 2) a call_rcu thread.
-                        */
-                       lttng_ht_destroy(ht);
-
-                       health_code_update();
                }
        }
 
index 022adf33edca6c4b51731d208e25c0a12c25eb3d..b8791553670fad96ca8c720a10c0feb1b37f6b63 100644 (file)
@@ -1051,31 +1051,33 @@ static void *thread_manage_kernel(void *data)
                        }
 
                        /* Check for data on kernel pipe */
-                       if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) {
-                               (void) lttng_read(kernel_poll_pipe[0],
-                                       &tmp, 1);
-                               /*
-                                * Ret value is useless here, if this pipe gets any actions an
-                                * update is required anyway.
-                                */
-                               update_poll_flag = 1;
-                               continue;
-                       } else {
-                               /*
-                                * New CPU detected by the kernel. Adding kernel stream to
-                                * kernel session and updating the kernel consumer
-                                */
-                               if (revents & LPOLLIN) {
+                       if (revents & LPOLLIN) {
+                               if (pollfd == kernel_poll_pipe[0]) {
+                                       (void) lttng_read(kernel_poll_pipe[0],
+                                               &tmp, 1);
+                                       /*
+                                        * Ret value is useless here, if this pipe gets any actions an
+                                        * update is required anyway.
+                                        */
+                                       update_poll_flag = 1;
+                                       continue;
+                               } else {
+                                       /*
+                                        * New CPU detected by the kernel. Adding kernel stream to
+                                        * kernel session and updating the kernel consumer
+                                        */
                                        ret = update_kernel_stream(&kconsumer_data, pollfd);
                                        if (ret < 0) {
                                                continue;
                                        }
                                        break;
-                                       /*
-                                        * TODO: We might want to handle the LPOLLERR | LPOLLHUP
-                                        * and unregister kernel stream at this point.
-                                        */
                                }
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               update_poll_flag = 1;
+                               continue;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
@@ -1205,9 +1207,14 @@ restart:
 
                /* Event on the registration socket */
                if (pollfd == consumer_data->err_sock) {
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                       if (revents & LPOLLIN) {
+                               continue;
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                ERR("consumer err socket poll error");
                                goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
@@ -1337,7 +1344,8 @@ restart_poll:
 
                        if (pollfd == sock) {
                                /* Event on the consumerd socket */
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+                                               && !(revents & LPOLLIN)) {
                                        ERR("consumer err socket second poll error");
                                        goto error;
                                }
@@ -1355,6 +1363,11 @@ restart_poll:
 
                                goto exit;
                        } else if (pollfd == consumer_data->metadata_fd) {
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+                                               && !(revents & LPOLLIN)) {
+                                       ERR("consumer err metadata socket second poll error");
+                                       goto error;
+                               }
                                /* UST metadata requests */
                                ret = ust_consumer_metadata_request(
                                                &consumer_data->metadata_sock);
@@ -1523,10 +1536,7 @@ static void *thread_manage_apps(void *data)
 
                        /* Inspect the apps cmd pipe */
                        if (pollfd == apps_cmd_pipe[0]) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       ERR("Apps command pipe error");
-                                       goto error;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        int sock;
 
                                        /* Empty pipe */
@@ -1539,9 +1549,8 @@ static void *thread_manage_apps(void *data)
                                        health_code_update();
 
                                        /*
-                                        * We only monitor the error events of the socket. This
-                                        * thread does not handle any incoming data from UST
-                                        * (POLLIN).
+                                        * Since this is a command socket (write then read),
+                                        * we only monitor the error events of the socket.
                                         */
                                        ret = lttng_poll_add(&events, sock,
                                                        LPOLLERR | LPOLLHUP | LPOLLRDHUP);
@@ -1550,6 +1559,12 @@ static void *thread_manage_apps(void *data)
                                        }
 
                                        DBG("Apps with sock %d added to poll set", sock);
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("Apps command pipe error");
+                                       goto error;
+                               } else {
+                                       ERR("Unknown poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        } else {
                                /*
@@ -1565,6 +1580,9 @@ static void *thread_manage_apps(void *data)
 
                                        /* Socket closed on remote end. */
                                        ust_app_unregister(pollfd);
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        }
 
@@ -1706,6 +1724,9 @@ static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
                                ust_app_destroy(wait_node->app);
                                free(wait_node);
                                break;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
@@ -2066,10 +2087,7 @@ static void *thread_registration_apps(void *data)
 
                        /* Event on the registration socket */
                        if (pollfd == apps_sock) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                                       ERR("Register apps socket poll error");
-                                       goto error;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        sock = lttcomm_accept_unix_sock(apps_sock);
                                        if (sock < 0) {
                                                goto error;
@@ -2156,6 +2174,12 @@ static void *thread_registration_apps(void *data)
                                         * barrier with the exchange in cds_wfcq_enqueue.
                                         */
                                        futex_nto1_wake(&ust_cmd_queue.futex);
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("Register apps socket poll error");
+                                       goto error;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        }
                }
@@ -3958,9 +3982,14 @@ restart:
 
                        /* Event on the registration socket */
                        if (pollfd == sock) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               if (revents & LPOLLIN) {
+                                       continue;
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                        ERR("Health socket poll error");
                                        goto error;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        }
                }
@@ -4135,9 +4164,14 @@ static void *thread_manage_clients(void *data)
 
                        /* Event on the registration socket */
                        if (pollfd == client_sock) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               if (revents & LPOLLIN) {
+                                       continue;
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                        ERR("Client socket poll error");
                                        goto error;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                        }
                }
index 35f82e3914c2bde40fc9567ef40131eae7d14765..272720fb92f2acf166d1ffe2f65864321583b362 100644 (file)
@@ -56,7 +56,8 @@ void *ust_thread_manage_notify(void *data)
        }
 
        /* Add notify pipe to the pollset. */
-       ret = lttng_poll_add(&events, apps_cmd_notify_pipe[0], LPOLLIN | LPOLLERR);
+       ret = lttng_poll_add(&events, apps_cmd_notify_pipe[0],
+                       LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
        if (ret < 0) {
                goto error;
        }
@@ -108,45 +109,56 @@ restart:
                        if (pollfd == apps_cmd_notify_pipe[0]) {
                                int sock;
 
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               if (revents & LPOLLIN) {
+                                       /* Get socket from dispatch thread. */
+                                       size_ret = lttng_read(apps_cmd_notify_pipe[0],
+                                                       &sock, sizeof(sock));
+                                       if (size_ret < sizeof(sock)) {
+                                               PERROR("read apps notify pipe");
+                                               goto error;
+                                       }
+                                       health_code_update();
+
+                                       ret = lttng_poll_add(&events, sock,
+                                                       LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
+                                       if (ret < 0) {
+                                               /*
+                                                * It's possible we've reached the max poll fd allowed.
+                                                * Let's close the socket but continue normal execution.
+                                                */
+                                               ret = close(sock);
+                                               if (ret) {
+                                                       PERROR("close notify socket %d", sock);
+                                               }
+                                               lttng_fd_put(LTTNG_FD_APPS, 1);
+                                               continue;
+                                       }
+                                       DBG3("UST thread notify added sock %d to pollset", sock);
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                        ERR("Apps notify command pipe error");
                                        goto error;
-                               } else if (!(revents & LPOLLIN)) {
-                                       /* No POLLIN and not a catched error, stop the thread. */
-                                       ERR("Notify command pipe failed. revent: %u", revents);
-                                       goto error;
-                               }
-
-                               /* Get socket from dispatch thread. */
-                               size_ret = lttng_read(apps_cmd_notify_pipe[0],
-                                               &sock, sizeof(sock));
-                               if (size_ret < sizeof(sock)) {
-                                       PERROR("read apps notify pipe");
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
                                        goto error;
                                }
-                               health_code_update();
-
-                               ret = lttng_poll_add(&events, sock,
-                                               LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
-                               if (ret < 0) {
-                                       /*
-                                        * It's possible we've reached the max poll fd allowed.
-                                        * Let's close the socket but continue normal execution.
-                                        */
-                                       ret = close(sock);
-                                       if (ret) {
-                                               PERROR("close notify socket %d", sock);
-                                       }
-                                       lttng_fd_put(LTTNG_FD_APPS, 1);
-                                       continue;
-                               }
-                               DBG3("UST thread notify added sock %d to pollset", sock);
                        } else {
                                /*
                                 * At this point, we know that a registered application
                                 * triggered the event.
                                 */
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               if (revents & (LPOLLIN | LPOLLPRI)) {
+                                       ret = ust_app_recv_notify(pollfd);
+                                       if (ret < 0) {
+                                               /* Removing from the poll set */
+                                               ret = lttng_poll_del(&events, pollfd);
+                                               if (ret < 0) {
+                                                       goto error;
+                                               }
+
+                                               /* The socket is closed after a grace period here. */
+                                               ust_app_notify_sock_unregister(pollfd);
+                                       }
+                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                        /* Removing from the poll set */
                                        ret = lttng_poll_del(&events, pollfd);
                                        if (ret < 0) {
@@ -155,22 +167,9 @@ restart:
 
                                        /* The socket is closed after a grace period here. */
                                        ust_app_notify_sock_unregister(pollfd);
-                               } else if (revents & (LPOLLIN | LPOLLPRI)) {
-                                       ret = ust_app_recv_notify(pollfd);
-                                       if (ret < 0) {
-                                               /*
-                                                * If the notification failed either the application is
-                                                * dead or an internal error happened. In both cases,
-                                                * we can only continue here. If the application is
-                                                * dead, an unregistration will follow or else the
-                                                * application will notice that we are not responding
-                                                * on that socket and will close it.
-                                                */
-                                               continue;
-                                       }
                                } else {
-                                       ERR("Unknown poll events %u for sock %d", revents, pollfd);
-                                       continue;
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto error;
                                }
                                health_code_update();
                        }
index 0b59c511cf02446939dd87266c4e4044604f3824..eed346d06253a2e44ec0298ab846b3e386da839f 100644 (file)
@@ -2217,26 +2217,22 @@ restart:
                        }
 
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
-                               if (revents & (LPOLLERR | LPOLLHUP )) {
-                                       DBG("Metadata thread pipe hung up");
-                                       /*
-                                        * Remove the pipe from the poll set and continue the loop
-                                        * since their might be data to consume.
-                                        */
-                                       lttng_poll_del(&events,
-                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
-                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
-                                       continue;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        ssize_t pipe_len;
 
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                        &stream, sizeof(stream));
                                        if (pipe_len < sizeof(stream)) {
-                                               PERROR("read metadata stream");
+                                               if (pipe_len < 0) {
+                                                       PERROR("read metadata stream");
+                                               }
                                                /*
-                                                * Continue here to handle the rest of the streams.
+                                                * Remove the pipe from the poll set and continue the loop
+                                                * since their might be data to consume.
                                                 */
+                                               lttng_poll_del(&events,
+                                                               lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+                                               lttng_pipe_read_close(ctx->consumer_metadata_pipe);
                                                continue;
                                        }
 
@@ -2253,6 +2249,19 @@ restart:
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
                                                        LPOLLIN | LPOLLPRI | LPOLLHUP);
+                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       DBG("Metadata thread pipe hung up");
+                                       /*
+                                        * Remove the pipe from the poll set and continue the loop
+                                        * since their might be data to consume.
+                                        */
+                                       lttng_poll_del(&events,
+                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
+                                       continue;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto end;
                                }
 
                                /* Handle other stream */
@@ -2271,8 +2280,30 @@ restart:
                        stream = caa_container_of(node, struct lttng_consumer_stream,
                                        node);
 
-                       /* Check for error event */
-                       if (revents & (LPOLLERR | LPOLLHUP)) {
+                       if (revents & (LPOLLIN | LPOLLPRI)) {
+                               /* Get the data out of the metadata file descriptor */
+                               DBG("Metadata available on fd %d", pollfd);
+                               assert(stream->wait_fd == pollfd);
+
+                               do {
+                                       health_code_update();
+
+                                       len = ctx->on_buffer_ready(stream, ctx);
+                                       /*
+                                        * We don't check the return value here since if we get
+                                        * a negative len, it means an error occured thus we
+                                        * simply remove it from the poll set and free the
+                                        * stream.
+                                        */
+                               } while (len > 0);
+
+                               /* It's ok to have an unavailable sub-buffer */
+                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
+                                       /* Clean up stream from consumer and free it. */
+                                       lttng_poll_del(&events, stream->wait_fd);
+                                       consumer_del_metadata_stream(stream, metadata_ht);
+                               }
+                       } else if (revents & (LPOLLERR | LPOLLHUP)) {
                                DBG("Metadata fd %d is hup|err.", pollfd);
                                if (!stream->hangup_flush_done
                                                && (consumer_data.type == LTTNG_CONSUMER32_UST
@@ -2300,31 +2331,11 @@ restart:
                                 * and securely free the stream.
                                 */
                                consumer_del_metadata_stream(stream, metadata_ht);
-                       } else if (revents & (LPOLLIN | LPOLLPRI)) {
-                               /* Get the data out of the metadata file descriptor */
-                               DBG("Metadata available on fd %d", pollfd);
-                               assert(stream->wait_fd == pollfd);
-
-                               do {
-                                       health_code_update();
-
-                                       len = ctx->on_buffer_ready(stream, ctx);
-                                       /*
-                                        * We don't check the return value here since if we get
-                                        * a negative len, it means an error occured thus we
-                                        * simply remove it from the poll set and free the
-                                        * stream.
-                                        */
-                               } while (len > 0);
-
-                               /* It's ok to have an unavailable sub-buffer */
-                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       /* Clean up stream from consumer and free it. */
-                                       lttng_poll_del(&events, stream->wait_fd);
-                                       consumer_del_metadata_stream(stream, metadata_ht);
-                               }
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               rcu_read_unlock;
+                               goto end;
                        }
-
                        /* Release RCU lock for the stream looked up */
                        rcu_read_unlock();
                }
@@ -2789,21 +2800,16 @@ restart:
                        }
 
                        if (pollfd == ctx->consumer_channel_pipe[0]) {
-                               if (revents & (LPOLLERR | LPOLLHUP)) {
-                                       DBG("Channel thread pipe hung up");
-                                       /*
-                                        * Remove the pipe from the poll set and continue the loop
-                                        * since their might be data to consume.
-                                        */
-                                       lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
-                                       continue;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        enum consumer_channel_action action;
                                        uint64_t key;
 
                                        ret = read_channel_pipe(ctx, &chan, &key, &action);
                                        if (ret <= 0) {
-                                               ERR("Error reading channel pipe");
+                                               if (ret < 0) {
+                                                       ERR("Error reading channel pipe");
+                                               }
+                                               lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
                                                continue;
                                        }
 
@@ -2820,7 +2826,7 @@ restart:
                                                rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                lttng_poll_add(&events, chan->wait_fd,
-                                                               LPOLLIN | LPOLLPRI);
+                                                               LPOLLERR | LPOLLHUP);
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
@@ -2880,6 +2886,17 @@ restart:
                                                ERR("Unknown action");
                                                break;
                                        }
+                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       DBG("Channel thread pipe hung up");
+                                       /*
+                                        * Remove the pipe from the poll set and continue the loop
+                                        * since their might be data to consume.
+                                        */
+                                       lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+                                       continue;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto end;
                                }
 
                                /* Handle other stream */
@@ -2918,6 +2935,10 @@ restart:
                                                && !uatomic_read(&chan->nb_init_stream_left)) {
                                        consumer_del_channel(chan);
                                }
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               rcu_read_unlock();
+                               goto end;
                        }
 
                        /* Release RCU lock for the channel looked up */
This page took 0.040015 seconds and 4 git commands to generate.