#include <common/utils.h>
#include "cmd.h"
+#include "ctf-trace.h"
#include "index.h"
#include "utils.h"
#include "lttng-relayd.h"
+#include "live.h"
/* command line options */
char *opt_output_path;
static int opt_daemon;
static struct lttng_uri *control_uri;
static struct lttng_uri *data_uri;
+static struct lttng_uri *live_uri;
const char *progname;
static uid_t relayd_uid;
static gid_t relayd_gid;
+/* Global relay stream hash table. */
+struct lttng_ht *relay_streams_ht;
+
+/* Global relay viewer stream hash table. */
+struct lttng_ht *viewer_streams_ht;
+
/*
* usage function on stderr
*/
goto exit;
}
}
+ if (live_uri == NULL) {
+ ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
+ DEFAULT_NETWORK_VIEWER_PORT);
+ if (ret < 0) {
+ PERROR("asprintf default viewer control address");
+ goto exit;
+ }
+
+ ret = uri_parse(default_address, &live_uri);
+ free(default_address);
+ if (ret < 0) {
+ ERR("Invalid viewer control URI specified");
+ goto exit;
+ }
+ }
exit:
return ret;
* Get stream from stream id.
* Need to be called with RCU read-side lock held.
*/
-static
-struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id,
- struct lttng_ht *streams_ht)
+struct relay_stream *relay_stream_find_by_id(uint64_t stream_id)
{
struct lttng_ht_node_ulong *node;
struct lttng_ht_iter iter;
struct relay_stream *ret;
- lttng_ht_lookup(streams_ht,
+ lttng_ht_lookup(relay_streams_ht,
(void *)((unsigned long) stream_id),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
{
struct relay_stream *stream =
caa_container_of(head, struct relay_stream, rcu_node);
+
+ ctf_trace_try_destroy(stream->ctf_trace);
+
free(stream->path_name);
free(stream->channel_name);
free(stream);
}
+static
+void deferred_free_session(struct rcu_head *head)
+{
+ struct relay_session *session =
+ caa_container_of(head, struct relay_session, rcu_node);
+ free(session);
+}
+
+static void close_stream(struct relay_stream *stream,
+ struct lttng_ht *ctf_traces_ht)
+{
+ int delret;
+ struct relay_viewer_stream *vstream;
+ struct lttng_ht_iter iter;
+
+ assert(stream);
+
+ delret = close(stream->fd);
+ if (delret < 0) {
+ PERROR("close stream");
+ }
+
+ if (stream->index_fd >= 0) {
+ delret = close(stream->index_fd);
+ if (delret < 0) {
+ PERROR("close stream index_fd");
+ }
+ }
+
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle);
+ if (vstream) {
+ /*
+ * Set the last good value into the viewer stream. This is done
+ * right before the stream gets deleted from the hash table. The
+ * lookup failure on the live thread side of a stream indicates
+ * that the viewer stream index received value should be used.
+ */
+ vstream->total_index_received = stream->total_index_received;
+ }
+
+ iter.iter.node = &stream->stream_n.node;
+ delret = lttng_ht_del(relay_streams_ht, &iter);
+ assert(!delret);
+ iter.iter.node = &stream->ctf_trace_node.node;
+ delret = lttng_ht_del(ctf_traces_ht, &iter);
+ assert(!delret);
+ call_rcu(&stream->rcu_node, deferred_free_stream);
+ DBG("Closed tracefile %d from close stream", stream->fd);
+}
+
/*
* relay_delete_session: Free all memory associated with a session and
* close all the FDs
*/
static
-void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht)
+void relay_delete_session(struct relay_command *cmd,
+ struct lttng_ht *sessions_ht)
{
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
DBG("Relay deleting session %" PRIu64, cmd->session->id);
rcu_read_lock();
- cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) {
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
stream = caa_container_of(node,
if (ret < 0) {
PERROR("close stream fd on delete session");
}
- ret = lttng_ht_del(streams_ht, &iter);
+ ret = lttng_ht_del(relay_streams_ht, &iter);
assert(!ret);
call_rcu(&stream->rcu_node,
deferred_free_stream);
indexes_ht);
}
}
+ iter.iter.node = &cmd->session->session_n.node;
+ ret = lttng_ht_del(sessions_ht, &iter);
+ assert(!ret);
+ call_rcu(&cmd->session->rcu_node,
+ deferred_free_session);
rcu_read_unlock();
-
- free(cmd->session);
}
/*
*/
static
int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd)
+ struct relay_command *cmd,
+ struct lttng_ht *sessions_ht)
{
int ret = 0, send_ret;
struct relay_session *session;
reply.session_id = htobe64(session->id);
+ switch (cmd->minor) {
+ case 4: /* LTTng sessiond 2.4 */
+ default:
+ ret = cmd_create_session_2_4(cmd, session);
+ break;
+ }
+
+ lttng_ht_node_init_ulong(&session->session_n,
+ (unsigned long) session->id);
+ lttng_ht_add_unique_ulong(sessions_ht,
+ &session->session_n);
+
DBG("Created session %" PRIu64, session->id);
error:
*/
static
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *streams_ht)
+ struct relay_command *cmd, struct lttng_ht *sessions_ht)
{
struct relay_session *session = cmd->session;
struct relay_stream *stream = NULL;
stream->prev_seq = -1ULL;
stream->session = session;
stream->index_fd = -1;
+ stream->read_index_fd = -1;
+ stream->ctf_trace = NULL;
+ pthread_mutex_init(&stream->lock, NULL);
ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
if (ret < 0) {
DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
}
+ if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
+ stream->metadata_flag = 1;
+ /*
+ * When we receive a new metadata stream, we create a new
+ * ctf_trace and we assign this ctf_trace to all streams with
+ * the same path.
+ *
+ * If later on we receive a new stream for the same ctf_trace,
+ * we copy the information from the first hit in the HT to the
+ * new stream.
+ */
+ stream->ctf_trace = ctf_trace_create();
+ if (!stream->ctf_trace) {
+ ret = -1;
+ goto end;
+ }
+ stream->ctf_trace->refcount++;
+ stream->ctf_trace->metadata_stream = stream;
+ }
+ ctf_trace_assign(cmd->ctf_traces_ht, stream);
+
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
- lttng_ht_add_unique_ulong(streams_ht,
+ lttng_ht_add_unique_ulong(relay_streams_ht,
&stream->stream_n);
+ lttng_ht_node_init_str(&stream->ctf_trace_node, stream->path_name);
+ lttng_ht_add_str(cmd->ctf_traces_ht, &stream->ctf_trace_node);
+
DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
stream->stream_handle);
*/
static
int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *streams_ht)
+ struct relay_command *cmd)
{
+ int ret, send_ret;
struct relay_session *session = cmd->session;
struct lttcomm_relayd_close_stream stream_info;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
- int ret, send_ret;
- struct lttng_ht_iter iter;
DBG("Close stream received");
}
rcu_read_lock();
- stream = relay_stream_from_stream_id(be64toh(stream_info.stream_id),
- streams_ht);
+ stream = relay_stream_find_by_id(be64toh(stream_info.stream_id));
if (!stream) {
ret = -1;
goto end_unlock;
stream->close_flag = 1;
if (close_stream_check(stream)) {
- int delret;
-
- delret = close(stream->fd);
- if (delret < 0) {
- PERROR("close stream");
- }
-
- if (stream->index_fd >= 0) {
- delret = close(stream->index_fd);
- if (delret < 0) {
- PERROR("close stream index_fd");
- }
- }
- iter.iter.node = &stream->stream_n.node;
- delret = lttng_ht_del(streams_ht, &iter);
- assert(!delret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- DBG("Closed tracefile %d from close stream", stream->fd);
+ close_stream(stream, cmd->ctf_traces_ht);
}
end_unlock:
*/
static
int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *streams_ht)
+ struct relay_command *cmd)
{
int ret = htobe32(LTTNG_OK);
struct relay_session *session = cmd->session;
metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
rcu_read_lock();
- metadata_stream = relay_stream_from_stream_id(
- be64toh(metadata_struct->stream_id), streams_ht);
+ metadata_stream = relay_stream_find_by_id(
+ be64toh(metadata_struct->stream_id));
if (!metadata_stream) {
ret = -1;
goto end_unlock;
if (ret < 0) {
goto end_unlock;
}
+ metadata_stream->ctf_trace->metadata_received +=
+ payload_size + be32toh(metadata_struct->padding_size);
DBG2("Relay metadata written");
*/
static
int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *streams_ht)
+ struct relay_command *cmd, struct lttng_ht *sessions_ht)
{
int ret;
struct lttcomm_relayd_version reply, msg;
if (reply.major != be32toh(msg.major)) {
DBG("Incompatible major versions (%u vs %u), deleting session",
reply.major, be32toh(msg.major));
- relay_delete_session(cmd, streams_ht);
+ relay_delete_session(cmd, sessions_ht);
ret = 0;
goto end;
}
*/
static
int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *streams_ht)
+ struct relay_command *cmd)
{
struct relay_session *session = cmd->session;
struct lttcomm_relayd_data_pending msg;
last_net_seq_num = be64toh(msg.last_net_seq_num);
rcu_read_lock();
- stream = relay_stream_from_stream_id(stream_id, streams_ht);
+ stream = relay_stream_find_by_id(stream_id);
if (stream == NULL) {
ret = -1;
goto end_unlock;
*/
static
int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *streams_ht)
+ struct relay_command *cmd)
{
int ret;
uint64_t stream_id;
stream_id = be64toh(msg.stream_id);
rcu_read_lock();
- cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ stream_n.node) {
if (stream->stream_handle == stream_id) {
stream->data_pending_check_done = 1;
DBG("Relay quiescent control pending flag set to %" PRIu64,
*/
static
int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *streams_ht)
+ struct relay_command *cmd)
{
int ret;
struct lttng_ht_iter iter;
assert(recv_hdr);
assert(cmd);
- assert(streams_ht);
DBG("Init streams for data pending");
* streams to find the one associated with the right session_id.
*/
rcu_read_lock();
- cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ stream_n.node) {
if (stream->session->id == session_id) {
stream->data_pending_check_done = 0;
DBG("Set begin data pending flag to stream %" PRIu64,
*/
static
int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *streams_ht)
+ struct relay_command *cmd)
{
int ret;
struct lttng_ht_iter iter;
assert(recv_hdr);
assert(cmd);
- assert(streams_ht);
DBG("End data pending command");
/* Iterate over all streams to see if the begin data pending flag is set. */
rcu_read_lock();
- cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ stream_n.node) {
if (stream->session->id == session_id &&
!stream->data_pending_check_done) {
is_data_inflight = 1;
*/
static
int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *streams_ht,
- struct lttng_ht *indexes_ht)
+ struct relay_command *cmd, struct lttng_ht *indexes_ht)
{
int ret, send_ret, index_created = 0;
struct relay_session *session = cmd->session;
uint64_t net_seq_num;
assert(cmd);
- assert(streams_ht);
assert(indexes_ht);
DBG("Relay receiving index");
net_seq_num = be64toh(index_info.net_seq_num);
rcu_read_lock();
- stream = relay_stream_from_stream_id(be64toh(index_info.relay_stream_id),
- streams_ht);
+ stream = relay_stream_find_by_id(be64toh(index_info.relay_stream_id));
if (!stream) {
ret = -1;
goto end_rcu_unlock;
}
+ /* Live beacon handling */
+ if (index_info.packet_size == 0) {
+ DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
+
+ /*
+ * Only flag a stream inactive when it has already received data.
+ */
+ if (stream->total_index_received > 0) {
+ stream->beacon_ts_end = be64toh(index_info.timestamp_end);
+ }
+ ret = 0;
+ goto end_rcu_unlock;
+ } else {
+ stream->beacon_ts_end = -1ULL;
+ }
+
index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht);
if (!index) {
/* A successful creation will add the object to the HT. */
if (ret < 0) {
goto end_rcu_unlock;
}
+ stream->total_index_received++;
}
end_rcu_unlock:
}
/*
- * relay_process_control: Process the commands received on the control socket
+ * Process the commands received on the control socket
*/
static
int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *streams_ht,
- struct lttng_ht *index_streams_ht,
- struct lttng_ht *indexes_ht)
+ struct relay_command *cmd, struct relay_local_data *ctx)
{
int ret = 0;
switch (be32toh(recv_hdr->cmd)) {
case RELAYD_CREATE_SESSION:
- ret = relay_create_session(recv_hdr, cmd);
+ ret = relay_create_session(recv_hdr, cmd, ctx->sessions_ht);
break;
case RELAYD_ADD_STREAM:
- ret = relay_add_stream(recv_hdr, cmd, streams_ht);
+ ret = relay_add_stream(recv_hdr, cmd, ctx->sessions_ht);
break;
case RELAYD_START_DATA:
ret = relay_start(recv_hdr, cmd);
break;
case RELAYD_SEND_METADATA:
- ret = relay_recv_metadata(recv_hdr, cmd, streams_ht);
+ ret = relay_recv_metadata(recv_hdr, cmd);
break;
case RELAYD_VERSION:
- ret = relay_send_version(recv_hdr, cmd, streams_ht);
+ ret = relay_send_version(recv_hdr, cmd, ctx->sessions_ht);
break;
case RELAYD_CLOSE_STREAM:
- ret = relay_close_stream(recv_hdr, cmd, streams_ht);
+ ret = relay_close_stream(recv_hdr, cmd);
break;
case RELAYD_DATA_PENDING:
- ret = relay_data_pending(recv_hdr, cmd, streams_ht);
+ ret = relay_data_pending(recv_hdr, cmd);
break;
case RELAYD_QUIESCENT_CONTROL:
- ret = relay_quiescent_control(recv_hdr, cmd, streams_ht);
+ ret = relay_quiescent_control(recv_hdr, cmd);
break;
case RELAYD_BEGIN_DATA_PENDING:
- ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht);
+ ret = relay_begin_data_pending(recv_hdr, cmd);
break;
case RELAYD_END_DATA_PENDING:
- ret = relay_end_data_pending(recv_hdr, cmd, streams_ht);
+ ret = relay_end_data_pending(recv_hdr, cmd);
break;
case RELAYD_SEND_INDEX:
- ret = relay_recv_index(recv_hdr, cmd, streams_ht, indexes_ht);
+ ret = relay_recv_index(recv_hdr, cmd, indexes_ht);
break;
case RELAYD_UPDATE_SYNC_INFO:
default:
* relay_process_data: Process the data received on the data socket
*/
static
-int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht,
+int relay_process_data(struct relay_command *cmd,
struct lttng_ht *indexes_ht)
{
int ret = 0, rotate_index = 0, index_created = 0;
stream_id = be64toh(data_hdr.stream_id);
rcu_read_lock();
- stream = relay_stream_from_stream_id(stream_id, streams_ht);
+ stream = relay_stream_find_by_id(stream_id);
if (!stream) {
ret = -1;
goto end_rcu_unlock;
if (ret < 0) {
goto end_rcu_unlock;
}
+ stream->total_index_received++;
}
do {
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
- int cret;
- struct lttng_ht_iter iter;
-
- cret = close(stream->fd);
- if (cret < 0) {
- PERROR("close stream process data");
- }
-
- cret = close(stream->index_fd);
- if (cret < 0) {
- PERROR("close stream index_fd");
- }
- iter.iter.node = &stream->stream_n.node;
- ret = lttng_ht_del(streams_ht, &iter);
- assert(!ret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- DBG("Closed tracefile %d after recv data", stream->fd);
+ close_stream(stream, cmd->ctf_traces_ht);
}
end_rcu_unlock:
goto error_read;
}
+ relay_connection->ctf_traces_ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING);
+ if (!relay_connection->ctf_traces_ht) {
+ goto error_read;
+ }
+
lttng_ht_node_init_ulong(&relay_connection->sock_n,
(unsigned long) relay_connection->sock->fd);
rcu_read_lock();
struct relay_command *relay_connection =
caa_container_of(head, struct relay_command, rcu_node);
+ lttng_ht_destroy(relay_connection->ctf_traces_ht);
lttcomm_destroy_sock(relay_connection->sock);
free(relay_connection);
}
static
void relay_del_connection(struct lttng_ht *relay_connections_ht,
- struct lttng_ht *streams_ht, struct lttng_ht_iter *iter,
- struct relay_command *relay_connection)
+ struct lttng_ht_iter *iter, struct relay_command *relay_connection,
+ struct lttng_ht *sessions_ht)
{
int ret;
ret = lttng_ht_del(relay_connections_ht, iter);
assert(!ret);
if (relay_connection->type == RELAY_CONTROL) {
- relay_delete_session(relay_connection, streams_ht);
+ relay_delete_session(relay_connection, sessions_ht);
}
call_rcu(&relay_connection->rcu_node,
struct lttng_ht *relay_connections_ht;
struct lttng_ht_node_ulong *node;
struct lttng_ht_iter iter;
- struct lttng_ht *streams_ht;
- struct lttng_ht *index_streams_ht;
struct lttcomm_relayd_hdr recv_hdr;
+ struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
+ struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
DBG("[thread] Relay worker started");
goto relay_connections_ht_error;
}
- /* tables of streams indexed by stream ID */
- streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- if (!streams_ht) {
- goto streams_ht_error;
- }
-
/* Tables of received indexes indexed by index handle and net_seq_num. */
indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64);
if (!indexes_ht) {
ERR("POLL ERROR");
relay_cleanup_poll_connection(&events, pollfd);
relay_del_connection(relay_connections_ht,
- streams_ht, &iter,
- relay_connection);
+ &iter, relay_connection, sessions_ht);
if (last_seen_data_fd == pollfd) {
last_seen_data_fd = last_notdel_data_fd;
}
DBG("Socket %d hung up", pollfd);
relay_cleanup_poll_connection(&events, pollfd);
relay_del_connection(relay_connections_ht,
- streams_ht, &iter,
- relay_connection);
+ &iter, relay_connection, sessions_ht);
if (last_seen_data_fd == pollfd) {
last_seen_data_fd = last_notdel_data_fd;
}
if (ret <= 0) {
relay_cleanup_poll_connection(&events, pollfd);
relay_del_connection(relay_connections_ht,
- streams_ht, &iter,
- relay_connection);
+ &iter, relay_connection, sessions_ht);
DBG("Control connection closed with %d", pollfd);
} else {
if (relay_connection->session) {
relay_connection->session->id);
}
ret = relay_process_control(&recv_hdr,
- relay_connection,
- streams_ht,
- index_streams_ht,
- indexes_ht);
+ relay_connection, relay_ctx);
if (ret < 0) {
/* Clear the session on error. */
relay_cleanup_poll_connection(&events, pollfd);
relay_del_connection(relay_connections_ht,
- streams_ht, &iter,
- relay_connection);
+ &iter, relay_connection, sessions_ht);
DBG("Connection closed with %d", pollfd);
}
seen_control = 1;
continue;
}
- ret = relay_process_data(relay_connection, streams_ht,
- indexes_ht);
+ ret = relay_process_data(relay_connection, indexes_ht);
/* connection closed */
if (ret < 0) {
relay_cleanup_poll_connection(&events, pollfd);
relay_del_connection(relay_connections_ht,
- streams_ht, &iter,
- relay_connection);
+ &iter, relay_connection, sessions_ht);
DBG("Data connection closed with %d", pollfd);
/*
* Every goto restart call sets the last seen fd where
relay_connection = caa_container_of(node,
struct relay_command, sock_n);
relay_del_connection(relay_connections_ht,
- streams_ht, &iter,
- relay_connection);
+ &iter, relay_connection, sessions_ht);
}
}
- rcu_read_unlock();
error_poll_create:
- lttng_ht_destroy(indexes_ht);
+ {
+ struct relay_index *index;
+ cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
+ relay_index_delete(index, indexes_ht);
+ }
+ lttng_ht_destroy(indexes_ht);
+ }
+ rcu_read_unlock();
indexes_ht_error:
- lttng_ht_destroy(streams_ht);
-streams_ht_error:
lttng_ht_destroy(relay_connections_ht);
relay_connections_ht_error:
/* Close relay cmd pipes */
{
int ret = 0;
void *status;
+ struct relay_local_data *relay_ctx;
/* Create thread quit pipe */
if ((ret = init_thread_quit_pipe()) < 0) {
/* Initialize communication library */
lttcomm_init();
+ relay_ctx = zmalloc(sizeof(struct relay_local_data));
+ if (!relay_ctx) {
+ PERROR("relay_ctx");
+ goto exit;
+ }
+
+ /* tables of sessions indexed by session ID */
+ relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ if (!relay_ctx->sessions_ht) {
+ goto exit_relay_ctx_sessions;
+ }
+
+ /* tables of streams indexed by stream ID */
+ relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ if (!relay_streams_ht) {
+ goto exit_relay_ctx_streams;
+ }
+
+ /* tables of streams indexed by stream ID */
+ viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!viewer_streams_ht) {
+ goto exit_relay_ctx_viewer_streams;
+ }
+
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
/* Setup the worker thread */
ret = pthread_create(&worker_thread, NULL,
- relay_thread_worker, (void *) NULL);
+ relay_thread_worker, (void *) relay_ctx);
if (ret != 0) {
PERROR("pthread_create worker");
goto exit_worker;
goto exit_listener;
}
+ ret = live_start_threads(live_uri, relay_ctx);
+ if (ret != 0) {
+ ERR("Starting live viewer threads");
+ }
+
exit_listener:
ret = pthread_join(listener_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
+ lttng_ht_destroy(viewer_streams_ht);
+
+exit_relay_ctx_viewer_streams:
+ lttng_ht_destroy(relay_streams_ht);
+
+exit_relay_ctx_streams:
+ lttng_ht_destroy(relay_ctx->sessions_ht);
+
+exit_relay_ctx_sessions:
+ free(relay_ctx);
exit:
+ live_stop_threads();
cleanup();
if (!ret) {
exit(EXIT_SUCCESS);