Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / bin / lttng-relayd / main.cpp
index 9793e277808975728d5f6965e12f8648de35a9ff..ed5ec9d8f58d18c36a87f57e5fe39530e27115a3 100644 (file)
@@ -25,6 +25,7 @@
 #include "tracefile-array.hpp"
 #include "utils.hpp"
 #include "version.hpp"
+#include "viewer-session.hpp"
 #include "viewer-stream.hpp"
 
 #include <common/align.hpp>
 #include <common/futex.hpp>
 #include <common/ini-config/ini-config.hpp>
 #include <common/path.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/sessiond-comm/inet.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/string-utils/format.hpp>
+#include <common/urcu.hpp>
 #include <common/uri.hpp>
 #include <common/utils.hpp>
 
@@ -167,6 +170,9 @@ struct lttng_ht *viewer_streams_ht;
 /* Global relay sessions hash table. */
 struct lttng_ht *sessions_ht;
 
+/* Global viewer sessions hash table. */
+struct lttng_ht *viewer_sessions_ht;
+
 /* Relayd health monitoring */
 struct health_app *health_relayd;
 
@@ -772,11 +778,13 @@ static void relayd_cleanup()
 
        if (viewer_streams_ht)
                lttng_ht_destroy(viewer_streams_ht);
+       if (viewer_sessions_ht) {
+               lttng_ht_destroy(viewer_sessions_ht);
+       }
        if (relay_streams_ht)
                lttng_ht_destroy(relay_streams_ht);
        if (sessions_ht)
                lttng_ht_destroy(sessions_ht);
-
        free(opt_output_path);
        free(opt_working_directory);
 
@@ -1320,12 +1328,13 @@ static void *relay_thread_dispatcher(void *data __attribute__((unused)))
                         * the data will be read at some point in time
                         * or wait to the end of the world :)
                         */
-                       ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn)); /* NOLINT
-                                                                                              sizeof
-                                                                                              used
-                                                                                              on a
-                                                                                              pointer.
-                                                                                            */
+                       ret = lttng_write(
+                               relay_conn_pipe[1], &new_conn, sizeof(new_conn)); /* NOLINT
+                                                                                    sizeof
+                                                                                    used
+                                                                                    on a
+                                                                                    pointer.
+                                                                                  */
                        if (ret < 0) {
                                PERROR("write connection pipe");
                                connection_put(new_conn);
@@ -1509,27 +1518,79 @@ end:
  */
 static void publish_connection_local_streams(struct relay_connection *conn)
 {
-       struct relay_stream *stream;
        struct relay_session *session = conn->session;
+       unsigned int created = 0;
+       bool closed = false;
+
+       LTTNG_ASSERT(viewer_sessions_ht);
 
        /*
         * We publish all streams belonging to a session atomically wrt
         * session lock.
         */
-       pthread_mutex_lock(&session->lock);
-       rcu_read_lock();
-       cds_list_for_each_entry_rcu(stream, &session->recv_list, recv_node)
-       {
+       const lttng::pthread::lock_guard session_lock(session->lock);
+
+       for (auto *stream :
+            lttng::urcu::rcu_list_iteration_adapter<relay_stream, &relay_stream::recv_node>(
+                    session->recv_list)) {
                stream_publish(stream);
        }
-       rcu_read_unlock();
 
        /*
         * Inform the viewer that there are new streams in the session.
         */
-       if (session->viewer_attached) {
-               uatomic_set(&session->new_streams, 1);
+       if (!session->viewer_attached) {
+               goto unlock;
+       }
+
+       /*
+        * Create viewer_streams for all the newly published streams for this relay session.
+        * This searches through all known viewer sessions and finds those that are
+        * attached to this connection's relay session. This is done so that the newer
+        * viewer streams will hold a reference on any relay streams that already exist,
+        * but may be unpublished between now and the next GET_NEW_STREAMS from the
+        * attached live viewer.
+        */
+       for (auto *viewer_session :
+            lttng::urcu::lfht_iteration_adapter<relay_viewer_session,
+                                                decltype(relay_viewer_session::viewer_session_n),
+                                                &relay_viewer_session::viewer_session_n>(
+                    *viewer_sessions_ht->ht)) {
+               for (auto *session_iter :
+                    lttng::urcu::rcu_list_iteration_adapter<relay_session,
+                                                            &relay_session::viewer_session_node>(
+                            viewer_session->session_list)) {
+                       if (session != session_iter) {
+                               continue;
+                       }
+                       const int ret = make_viewer_streams(session,
+                                                           viewer_session,
+                                                           LTTNG_VIEWER_SEEK_BEGINNING,
+                                                           nullptr,
+                                                           nullptr,
+                                                           &created,
+                                                           &closed);
+                       if (ret == 0) {
+                               DBG("Created %d new viewer streams during publication of relay streams for relay session %" PRIu64,
+                                   created,
+                                   session->id);
+                       } else if (ret < 0) {
+                               /*
+                                * Warning, since the creation of the
+                                * streams will be retried when the viewer
+                                * next sends the GET_NEW_STREAMS again.
+                                */
+                               WARN("Failed to create new viewer streams during publication of relay streams for relay session %" PRIu64
+                                    ", ret=%d, created=%d, closed=%d",
+                                    session->id,
+                                    ret,
+                                    created,
+                                    closed);
+                       }
+               }
        }
+unlock:
+       uatomic_set(&session->new_streams, 1);
        pthread_mutex_unlock(&session->lock);
 }
 
@@ -2192,10 +2253,8 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 {
        int ret;
        ssize_t send_ret;
-       struct lttng_ht_iter iter;
        struct lttcomm_relayd_begin_data_pending msg;
        struct lttcomm_relayd_generic_reply reply;
-       struct relay_stream *stream;
 
        LTTNG_ASSERT(recv_hdr);
        LTTNG_ASSERT(conn);
@@ -2224,11 +2283,14 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
         * to iterate over all streams to find the one associated with
         * the right session_id.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<relay_stream,
+                                                decltype(relay_stream::node),
+                                                &relay_stream::node>(*relay_streams_ht->ht)) {
                if (!stream_get(stream)) {
                        continue;
                }
+
                if (stream->trace->session->id == msg.session_id) {
                        pthread_mutex_lock(&stream->lock);
                        stream->data_pending_check_done = false;
@@ -2236,9 +2298,9 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                        DBG("Set begin data pending flag to stream %" PRIu64,
                            stream->stream_handle);
                }
+
                stream_put(stream);
        }
-       rcu_read_unlock();
 
        memset(&reply, 0, sizeof(reply));
        /* All good, send back reply. */
@@ -2271,10 +2333,8 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at
 {
        int ret;
        ssize_t send_ret;
-       struct lttng_ht_iter iter;
        struct lttcomm_relayd_end_data_pending msg;
        struct lttcomm_relayd_generic_reply reply;
-       struct relay_stream *stream;
        uint32_t is_data_inflight = 0;
 
        DBG("End data pending command");
@@ -2299,15 +2359,19 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at
         * Iterate over all streams to see if the begin data pending
         * flag is set.
         */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<relay_stream,
+                                                decltype(relay_stream::node),
+                                                &relay_stream::node>(*relay_streams_ht->ht)) {
                if (!stream_get(stream)) {
                        continue;
                }
+
                if (stream->trace->session->id != msg.session_id) {
                        stream_put(stream);
                        continue;
                }
+
                pthread_mutex_lock(&stream->lock);
                if (!stream->data_pending_check_done) {
                        uint64_t stream_seq;
@@ -2322,6 +2386,7 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at
                        } else {
                                stream_seq = stream->prev_data_seq;
                        }
+
                        if (!stream->closed ||
                            !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
                                is_data_inflight = 1;
@@ -2332,10 +2397,10 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at
                                break;
                        }
                }
+
                pthread_mutex_unlock(&stream->lock);
                stream_put(stream);
        }
-       rcu_read_unlock();
 
        memset(&reply, 0, sizeof(reply));
        /* All good, send back reply. */
@@ -3888,8 +3953,6 @@ static void *relay_thread_worker(void *data __attribute__((unused)))
        uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttng_ht *relay_connections_ht;
-       struct lttng_ht_iter iter;
-       struct relay_connection *destroy_conn = nullptr;
 
        DBG("[thread] Relay worker started");
 
@@ -4067,7 +4130,7 @@ restart:
 
                if (last_seen_data_fd >= 0) {
                        for (i = 0; i < nb_fd; i++) {
-                               int pollfd = LTTNG_POLL_GETFD(&events, i);
+                               const int pollfd = LTTNG_POLL_GETFD(&events, i);
 
                                health_code_update();
 
@@ -4081,8 +4144,8 @@ restart:
                /* Process data connection. */
                for (i = idx + 1; i < nb_fd; i++) {
                        /* Fetch the poll data. */
-                       uint32_t revents = LTTNG_POLL_GETEV(&events, i);
-                       int pollfd = LTTNG_POLL_GETFD(&events, i);
+                       const uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+                       const int pollfd = LTTNG_POLL_GETFD(&events, i);
                        struct relay_connection *data_conn;
 
                        health_code_update();
@@ -4161,8 +4224,11 @@ restart:
 exit:
 error:
        /* Cleanup remaining connection object. */
-       rcu_read_lock();
-       cds_lfht_for_each_entry (relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
+       for (auto *destroy_conn :
+            lttng::urcu::lfht_iteration_adapter<relay_connection,
+                                                decltype(relay_connection::sock_n),
+                                                &relay_connection::sock_n>(
+                    *relay_connections_ht->ht)) {
                health_code_update();
 
                session_abort(destroy_conn->session);
@@ -4173,7 +4239,6 @@ error:
                 */
                relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn);
        }
-       rcu_read_unlock();
 
        (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
 error_poll_create:
@@ -4388,6 +4453,13 @@ int main(int argc, char **argv)
                goto exit_options;
        }
 
+       /* tables of viewer sessions indexed by session ID */
+       viewer_sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!viewer_sessions_ht) {
+               retval = -1;
+               goto exit_options;
+       }
+
        /* tables of streams indexed by stream ID */
        viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!viewer_streams_ht) {
This page took 0.029006 seconds and 4 git commands to generate.