Fix: LPOLLHUP and LPOLLERR when there is still data in pipe/socket
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 1c90eaddff02ff1023a0193d34443258981e8355..30c7bd1675525e56e213b089d3a0ce2ed5eee0dc 100644 (file)
@@ -832,10 +832,7 @@ restart:
                                goto exit;
                        }
 
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("socket poll error");
-                               goto error;
-                       } else if (revents & LPOLLIN) {
+                       if (revents & LPOLLIN) {
                                /*
                                 * A new connection is requested, therefore a
                                 * sessiond/consumerd connection is allocated in
@@ -887,6 +884,12 @@ restart:
                                 * exchange in cds_wfcq_enqueue.
                                 */
                                futex_nto1_wake(&relay_conn_queue.futex);
+                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                               ERR("socket poll error");
+                               goto error;
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               goto error;
                        }
                }
        }
@@ -1243,8 +1246,15 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
        pthread_mutex_unlock(&stream->lock);
 
-       stream_close(stream);
-
+       /*
+        * This is one of the conditions which may trigger a stream close
+        * with the others being:
+        *     1) A close command is received for a stream
+        *     2) The control connection owning the stream is closed
+        *     3) We have received all of the stream's data _after_ a close
+        *        request.
+        */
+       try_stream_close(stream);
        if (stream->is_metadata) {
                struct relay_viewer_stream *vstream;
 
@@ -1363,7 +1373,7 @@ end:
 static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
                struct relay_connection *conn)
 {
-       int ret = htobe32(LTTNG_OK);
+       int ret = 0;
        ssize_t size_ret;
        struct relay_session *session = conn->session;
        struct lttcomm_relayd_metadata_payload *metadata_struct;
@@ -1400,9 +1410,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
        }
        memset(data_buffer, 0, data_size);
        DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size);
-       ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
-       if (ret < 0 || ret != data_size) {
-               if (ret == 0) {
+       size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
+       if (size_ret < 0 || size_ret != data_size) {
+               if (size_ret == 0) {
                        /* Orderly shutdown. Not necessary to print an error. */
                        DBG("Socket %d did an orderly shutdown", conn->sock->fd);
                } else {
@@ -1429,9 +1439,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
                goto end_put;
        }
 
-       ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+       size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
                        be32toh(metadata_struct->padding_size));
-       if (ret < 0) {
+       if (size_ret < 0) {
                goto end_put;
        }
 
@@ -1443,7 +1453,6 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
 end_put:
        pthread_mutex_unlock(&metadata_stream->lock);
        stream_put(metadata_stream);
-
 end:
        return ret;
 }
@@ -2124,7 +2133,7 @@ static int relay_process_data(struct relay_connection *conn)
        uint64_t net_seq_num;
        uint32_t data_size;
        struct relay_session *session;
-       bool new_stream = false;
+       bool new_stream = false, close_requested = false;
 
        ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
                        sizeof(struct lttcomm_relayd_data_hdr), 0);
@@ -2243,7 +2252,12 @@ static int relay_process_data(struct relay_connection *conn)
        stream->prev_seq = net_seq_num;
 
 end_stream_unlock:
+       close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
+       if (close_requested) {
+               try_stream_close(stream);
+       }
+
        if (new_stream) {
                pthread_mutex_lock(&session->lock);
                uatomic_set(&session->new_streams, 1);
This page took 0.025169 seconds and 4 git commands to generate.