#include "tracefile-array.hpp"
#include "utils.hpp"
#include "version.hpp"
+#include "viewer-session.hpp"
#include "viewer-stream.hpp"
#include <common/align.hpp>
#include <common/futex.hpp>
#include <common/ini-config/ini-config.hpp>
#include <common/path.hpp>
+#include <common/pthread-lock.hpp>
#include <common/sessiond-comm/inet.hpp>
#include <common/sessiond-comm/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
/* Global relay sessions hash table. */
struct lttng_ht *sessions_ht;
+/* Global viewer sessions hash table. */
+struct lttng_ht *viewer_sessions_ht;
+
/* Relayd health monitoring */
struct health_app *health_relayd;
if (viewer_streams_ht)
lttng_ht_destroy(viewer_streams_ht);
+ if (viewer_sessions_ht) {
+ lttng_ht_destroy(viewer_sessions_ht);
+ }
if (relay_streams_ht)
lttng_ht_destroy(relay_streams_ht);
if (sessions_ht)
lttng_ht_destroy(sessions_ht);
-
free(opt_output_path);
free(opt_working_directory);
*/
static void publish_connection_local_streams(struct relay_connection *conn)
{
- struct relay_stream *stream;
struct relay_session *session = conn->session;
+ unsigned int created = 0;
+ bool closed = false;
+
+ LTTNG_ASSERT(viewer_sessions_ht);
/*
* We publish all streams belonging to a session atomically wrt
* session lock.
*/
- pthread_mutex_lock(&session->lock);
- lttng::urcu::read_lock_guard read_lock;
- cds_list_for_each_entry_rcu(stream, &session->recv_list, recv_node)
- {
+ const lttng::pthread::lock_guard session_lock(session->lock);
+
+ for (auto *stream :
+ lttng::urcu::rcu_list_iteration_adapter<relay_stream, &relay_stream::recv_node>(
+ session->recv_list)) {
stream_publish(stream);
}
/*
* Inform the viewer that there are new streams in the session.
*/
- if (session->viewer_attached) {
- uatomic_set(&session->new_streams, 1);
+ if (!session->viewer_attached) {
+ goto unlock;
}
+
+ /*
+ * Create viewer_streams for all the newly published streams for this relay session.
+ * This searches through all known viewer sessions and finds those that are
+ * attached to this connection's relay session. This is done so that the newer
+ * viewer streams will hold a reference on any relay streams that already exist,
+ * but may be unpublished between now and the next GET_NEW_STREAMS from the
+ * attached live viewer.
+ */
+ for (auto *viewer_session :
+ lttng::urcu::lfht_iteration_adapter<relay_viewer_session,
+ decltype(relay_viewer_session::viewer_session_n),
+ &relay_viewer_session::viewer_session_n>(
+ *viewer_sessions_ht->ht)) {
+ for (auto *session_iter :
+ lttng::urcu::rcu_list_iteration_adapter<relay_session,
+ &relay_session::viewer_session_node>(
+ viewer_session->session_list)) {
+ if (session != session_iter) {
+ continue;
+ }
+ const int ret = make_viewer_streams(session,
+ viewer_session,
+ LTTNG_VIEWER_SEEK_BEGINNING,
+ nullptr,
+ nullptr,
+ &created,
+ &closed);
+ if (ret == 0) {
+ DBG("Created %d new viewer streams during publication of relay streams for relay session %" PRIu64,
+ created,
+ session->id);
+ } else if (ret < 0) {
+ /*
+ * Warning, since the creation of the
+ * streams will be retried when the viewer
+ * next sends the GET_NEW_STREAMS again.
+ */
+ WARN("Failed to create new viewer streams during publication of relay streams for relay session %" PRIu64
+ ", ret=%d, created=%d, closed=%d",
+ session->id,
+ ret,
+ created,
+ closed);
+ }
+ }
+ }
+unlock:
+ uatomic_set(&session->new_streams, 1);
pthread_mutex_unlock(&session->lock);
}
{
int ret;
ssize_t send_ret;
- struct lttng_ht_iter iter;
struct lttcomm_relayd_begin_data_pending msg;
struct lttcomm_relayd_generic_reply reply;
- struct relay_stream *stream;
LTTNG_ASSERT(recv_hdr);
LTTNG_ASSERT(conn);
* to iterate over all streams to find the one associated with
* the right session_id.
*/
- {
- lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
- if (!stream_get(stream)) {
- continue;
- }
-
- if (stream->trace->session->id == msg.session_id) {
- pthread_mutex_lock(&stream->lock);
- stream->data_pending_check_done = false;
- pthread_mutex_unlock(&stream->lock);
- DBG("Set begin data pending flag to stream %" PRIu64,
- stream->stream_handle);
- }
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<relay_stream,
+ decltype(relay_stream::node),
+ &relay_stream::node>(*relay_streams_ht->ht)) {
+ if (!stream_get(stream)) {
+ continue;
+ }
- stream_put(stream);
+ if (stream->trace->session->id == msg.session_id) {
+ pthread_mutex_lock(&stream->lock);
+ stream->data_pending_check_done = false;
+ pthread_mutex_unlock(&stream->lock);
+ DBG("Set begin data pending flag to stream %" PRIu64,
+ stream->stream_handle);
}
+
+ stream_put(stream);
}
memset(&reply, 0, sizeof(reply));
{
int ret;
ssize_t send_ret;
- struct lttng_ht_iter iter;
struct lttcomm_relayd_end_data_pending msg;
struct lttcomm_relayd_generic_reply reply;
- struct relay_stream *stream;
uint32_t is_data_inflight = 0;
DBG("End data pending command");
* Iterate over all streams to see if the begin data pending
* flag is set.
*/
- {
- lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
- if (!stream_get(stream)) {
- continue;
- }
-
- if (stream->trace->session->id != msg.session_id) {
- stream_put(stream);
- continue;
- }
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<relay_stream,
+ decltype(relay_stream::node),
+ &relay_stream::node>(*relay_streams_ht->ht)) {
+ if (!stream_get(stream)) {
+ continue;
+ }
- pthread_mutex_lock(&stream->lock);
- if (!stream->data_pending_check_done) {
- uint64_t stream_seq;
+ if (stream->trace->session->id != msg.session_id) {
+ stream_put(stream);
+ continue;
+ }
- if (session_streams_have_index(conn->session)) {
- /*
- * Ensure that both the index and stream data have been
- * flushed up to the requested point.
- */
- stream_seq = std::min(stream->prev_data_seq,
- stream->prev_index_seq);
- } else {
- stream_seq = stream->prev_data_seq;
- }
+ pthread_mutex_lock(&stream->lock);
+ if (!stream->data_pending_check_done) {
+ uint64_t stream_seq;
- if (!stream->closed ||
- !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
- is_data_inflight = 1;
- DBG("Data is still in flight for stream %" PRIu64,
- stream->stream_handle);
- pthread_mutex_unlock(&stream->lock);
- stream_put(stream);
- break;
- }
+ if (session_streams_have_index(conn->session)) {
+ /*
+ * Ensure that both the index and stream data have been
+ * flushed up to the requested point.
+ */
+ stream_seq =
+ std::min(stream->prev_data_seq, stream->prev_index_seq);
+ } else {
+ stream_seq = stream->prev_data_seq;
}
- pthread_mutex_unlock(&stream->lock);
- stream_put(stream);
+ if (!stream->closed ||
+ !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+ break;
+ }
}
+
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
}
memset(&reply, 0, sizeof(reply));
uint32_t nb_fd;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
- struct lttng_ht_iter iter;
- struct relay_connection *destroy_conn = nullptr;
DBG("[thread] Relay worker started");
if (last_seen_data_fd >= 0) {
for (i = 0; i < nb_fd; i++) {
- int pollfd = LTTNG_POLL_GETFD(&events, i);
+ const int pollfd = LTTNG_POLL_GETFD(&events, i);
health_code_update();
/* Process data connection. */
for (i = idx + 1; i < nb_fd; i++) {
/* Fetch the poll data. */
- uint32_t revents = LTTNG_POLL_GETEV(&events, i);
- int pollfd = LTTNG_POLL_GETFD(&events, i);
+ const uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+ const int pollfd = LTTNG_POLL_GETFD(&events, i);
struct relay_connection *data_conn;
health_code_update();
exit:
error:
/* Cleanup remaining connection object. */
- {
- lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (
- relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
- health_code_update();
+ for (auto *destroy_conn :
+ lttng::urcu::lfht_iteration_adapter<relay_connection,
+ decltype(relay_connection::sock_n),
+ &relay_connection::sock_n>(
+ *relay_connections_ht->ht)) {
+ health_code_update();
- session_abort(destroy_conn->session);
+ session_abort(destroy_conn->session);
- /*
- * No need to grab another ref, because we own
- * destroy_conn.
- */
- relay_thread_close_connection(
- &events, destroy_conn->sock->fd, destroy_conn);
- }
+ /*
+ * No need to grab another ref, because we own
+ * destroy_conn.
+ */
+ relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn);
}
(void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
goto exit_options;
}
+ /* tables of viewer sessions indexed by session ID */
+ viewer_sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!viewer_sessions_ht) {
+ retval = -1;
+ goto exit_options;
+ }
+
/* tables of streams indexed by stream ID */
viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!viewer_streams_ht) {