Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / bin / lttng-relayd / main.cpp
index 1de1f1853009e10cac954e690edd2aeaa2b25c4f..ed5ec9d8f58d18c36a87f57e5fe39530e27115a3 100644 (file)
@@ -25,6 +25,7 @@
 #include "tracefile-array.hpp"
 #include "utils.hpp"
 #include "version.hpp"
+#include "viewer-session.hpp"
 #include "viewer-stream.hpp"
 
 #include <common/align.hpp>
@@ -42,6 +43,7 @@
 #include <common/futex.hpp>
 #include <common/ini-config/ini-config.hpp>
 #include <common/path.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/sessiond-comm/inet.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
@@ -168,6 +170,9 @@ struct lttng_ht *viewer_streams_ht;
 /* Global relay sessions hash table. */
 struct lttng_ht *sessions_ht;
 
+/* Global viewer sessions hash table. */
+struct lttng_ht *viewer_sessions_ht;
+
 /* Relayd health monitoring */
 struct health_app *health_relayd;
 
@@ -773,11 +778,13 @@ static void relayd_cleanup()
 
        if (viewer_streams_ht)
                lttng_ht_destroy(viewer_streams_ht);
+       if (viewer_sessions_ht) {
+               lttng_ht_destroy(viewer_sessions_ht);
+       }
        if (relay_streams_ht)
                lttng_ht_destroy(relay_streams_ht);
        if (sessions_ht)
                lttng_ht_destroy(sessions_ht);
-
        free(opt_output_path);
        free(opt_working_directory);
 
@@ -1511,26 +1518,79 @@ end:
  */
 static void publish_connection_local_streams(struct relay_connection *conn)
 {
-       struct relay_stream *stream;
        struct relay_session *session = conn->session;
+       unsigned int created = 0;
+       bool closed = false;
+
+       LTTNG_ASSERT(viewer_sessions_ht);
 
        /*
         * We publish all streams belonging to a session atomically wrt
         * session lock.
         */
-       pthread_mutex_lock(&session->lock);
-       lttng::urcu::read_lock_guard read_lock;
-       cds_list_for_each_entry_rcu(stream, &session->recv_list, recv_node)
-       {
+       const lttng::pthread::lock_guard session_lock(session->lock);
+
+       for (auto *stream :
+            lttng::urcu::rcu_list_iteration_adapter<relay_stream, &relay_stream::recv_node>(
+                    session->recv_list)) {
                stream_publish(stream);
        }
 
        /*
         * Inform the viewer that there are new streams in the session.
         */
-       if (session->viewer_attached) {
-               uatomic_set(&session->new_streams, 1);
+       if (!session->viewer_attached) {
+               goto unlock;
        }
+
+       /*
+        * Create viewer_streams for all the newly published streams for this relay session.
+        * This searches through all known viewer sessions and finds those that are
+        * attached to this connection's relay session. This is done so that the newer
+        * viewer streams will hold a reference on any relay streams that already exist,
+        * but may be unpublished between now and the next GET_NEW_STREAMS from the
+        * attached live viewer.
+        */
+       for (auto *viewer_session :
+            lttng::urcu::lfht_iteration_adapter<relay_viewer_session,
+                                                decltype(relay_viewer_session::viewer_session_n),
+                                                &relay_viewer_session::viewer_session_n>(
+                    *viewer_sessions_ht->ht)) {
+               for (auto *session_iter :
+                    lttng::urcu::rcu_list_iteration_adapter<relay_session,
+                                                            &relay_session::viewer_session_node>(
+                            viewer_session->session_list)) {
+                       if (session != session_iter) {
+                               continue;
+                       }
+                       const int ret = make_viewer_streams(session,
+                                                           viewer_session,
+                                                           LTTNG_VIEWER_SEEK_BEGINNING,
+                                                           nullptr,
+                                                           nullptr,
+                                                           &created,
+                                                           &closed);
+                       if (ret == 0) {
+                               DBG("Created %d new viewer streams during publication of relay streams for relay session %" PRIu64,
+                                   created,
+                                   session->id);
+                       } else if (ret < 0) {
+                               /*
+                                * Warning, since the creation of the
+                                * streams will be retried when the viewer
+                                * next sends the GET_NEW_STREAMS again.
+                                */
+                               WARN("Failed to create new viewer streams during publication of relay streams for relay session %" PRIu64
+                                    ", ret=%d, created=%d, closed=%d",
+                                    session->id,
+                                    ret,
+                                    created,
+                                    closed);
+                       }
+               }
+       }
+unlock:
+       uatomic_set(&session->new_streams, 1);
        pthread_mutex_unlock(&session->lock);
 }
 
@@ -2193,10 +2253,8 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 {
        int ret;
        ssize_t send_ret;
-       struct lttng_ht_iter iter;
        struct lttcomm_relayd_begin_data_pending msg;
        struct lttcomm_relayd_generic_reply reply;
-       struct relay_stream *stream;
 
        LTTNG_ASSERT(recv_hdr);
        LTTNG_ASSERT(conn);
@@ -2225,24 +2283,23 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
         * to iterate over all streams to find the one associated with
         * the right session_id.
         */
-       {
-               lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
-                       if (!stream_get(stream)) {
-                               continue;
-                       }
-
-                       if (stream->trace->session->id == msg.session_id) {
-                               pthread_mutex_lock(&stream->lock);
-                               stream->data_pending_check_done = false;
-                               pthread_mutex_unlock(&stream->lock);
-                               DBG("Set begin data pending flag to stream %" PRIu64,
-                                   stream->stream_handle);
-                       }
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<relay_stream,
+                                                decltype(relay_stream::node),
+                                                &relay_stream::node>(*relay_streams_ht->ht)) {
+               if (!stream_get(stream)) {
+                       continue;
+               }
 
-                       stream_put(stream);
+               if (stream->trace->session->id == msg.session_id) {
+                       pthread_mutex_lock(&stream->lock);
+                       stream->data_pending_check_done = false;
+                       pthread_mutex_unlock(&stream->lock);
+                       DBG("Set begin data pending flag to stream %" PRIu64,
+                           stream->stream_handle);
                }
+
+               stream_put(stream);
        }
 
        memset(&reply, 0, sizeof(reply));
@@ -2276,10 +2333,8 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at
 {
        int ret;
        ssize_t send_ret;
-       struct lttng_ht_iter iter;
        struct lttcomm_relayd_end_data_pending msg;
        struct lttcomm_relayd_generic_reply reply;
-       struct relay_stream *stream;
        uint32_t is_data_inflight = 0;
 
        DBG("End data pending command");
@@ -2304,48 +2359,47 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at
         * Iterate over all streams to see if the begin data pending
         * flag is set.
         */
-       {
-               lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
-                       if (!stream_get(stream)) {
-                               continue;
-                       }
-
-                       if (stream->trace->session->id != msg.session_id) {
-                               stream_put(stream);
-                               continue;
-                       }
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<relay_stream,
+                                                decltype(relay_stream::node),
+                                                &relay_stream::node>(*relay_streams_ht->ht)) {
+               if (!stream_get(stream)) {
+                       continue;
+               }
 
-                       pthread_mutex_lock(&stream->lock);
-                       if (!stream->data_pending_check_done) {
-                               uint64_t stream_seq;
+               if (stream->trace->session->id != msg.session_id) {
+                       stream_put(stream);
+                       continue;
+               }
 
-                               if (session_streams_have_index(conn->session)) {
-                                       /*
-                                        * Ensure that both the index and stream data have been
-                                        * flushed up to the requested point.
-                                        */
-                                       stream_seq = std::min(stream->prev_data_seq,
-                                                             stream->prev_index_seq);
-                               } else {
-                                       stream_seq = stream->prev_data_seq;
-                               }
+               pthread_mutex_lock(&stream->lock);
+               if (!stream->data_pending_check_done) {
+                       uint64_t stream_seq;
 
-                               if (!stream->closed ||
-                                   !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
-                                       is_data_inflight = 1;
-                                       DBG("Data is still in flight for stream %" PRIu64,
-                                           stream->stream_handle);
-                                       pthread_mutex_unlock(&stream->lock);
-                                       stream_put(stream);
-                                       break;
-                               }
+                       if (session_streams_have_index(conn->session)) {
+                               /*
+                                * Ensure that both the index and stream data have been
+                                * flushed up to the requested point.
+                                */
+                               stream_seq =
+                                       std::min(stream->prev_data_seq, stream->prev_index_seq);
+                       } else {
+                               stream_seq = stream->prev_data_seq;
                        }
 
-                       pthread_mutex_unlock(&stream->lock);
-                       stream_put(stream);
+                       if (!stream->closed ||
+                           !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
+                               is_data_inflight = 1;
+                               DBG("Data is still in flight for stream %" PRIu64,
+                                   stream->stream_handle);
+                               pthread_mutex_unlock(&stream->lock);
+                               stream_put(stream);
+                               break;
+                       }
                }
+
+               pthread_mutex_unlock(&stream->lock);
+               stream_put(stream);
        }
 
        memset(&reply, 0, sizeof(reply));
@@ -3899,8 +3953,6 @@ static void *relay_thread_worker(void *data __attribute__((unused)))
        uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttng_ht *relay_connections_ht;
-       struct lttng_ht_iter iter;
-       struct relay_connection *destroy_conn = nullptr;
 
        DBG("[thread] Relay worker started");
 
@@ -4078,7 +4130,7 @@ restart:
 
                if (last_seen_data_fd >= 0) {
                        for (i = 0; i < nb_fd; i++) {
-                               int pollfd = LTTNG_POLL_GETFD(&events, i);
+                               const int pollfd = LTTNG_POLL_GETFD(&events, i);
 
                                health_code_update();
 
@@ -4092,8 +4144,8 @@ restart:
                /* Process data connection. */
                for (i = idx + 1; i < nb_fd; i++) {
                        /* Fetch the poll data. */
-                       uint32_t revents = LTTNG_POLL_GETEV(&events, i);
-                       int pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+                       const int pollfd = LTTNG_POLL_GETFD(&events, i);
                        struct relay_connection *data_conn;
 
                        health_code_update();
@@ -4172,22 +4224,20 @@ restart:
 exit:
 error:
        /* Cleanup remaining connection object. */
-       {
-               lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (
-                       relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
-                       health_code_update();
+       for (auto *destroy_conn :
+            lttng::urcu::lfht_iteration_adapter<relay_connection,
+                                                decltype(relay_connection::sock_n),
+                                                &relay_connection::sock_n>(
+                    *relay_connections_ht->ht)) {
+               health_code_update();
 
-                       session_abort(destroy_conn->session);
+               session_abort(destroy_conn->session);
 
-                       /*
-                        * No need to grab another ref, because we own
-                        * destroy_conn.
-                        */
-                       relay_thread_close_connection(
-                               &events, destroy_conn->sock->fd, destroy_conn);
-               }
+               /*
+                * No need to grab another ref, because we own
+                * destroy_conn.
+                */
+               relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn);
        }
 
        (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
@@ -4403,6 +4453,13 @@ int main(int argc, char **argv)
                goto exit_options;
        }
 
+       /* tables of viewer sessions indexed by session ID */
+       viewer_sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!viewer_sessions_ht) {
+               retval = -1;
+               goto exit_options;
+       }
+
        /* tables of streams indexed by stream ID */
        viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!viewer_streams_ht) {
This page took 0.029755 seconds and 4 git commands to generate.