#include "utils.h"
#include "lttng-relayd.h"
#include "live.h"
+#include "health-relayd.h"
/* command line options */
char *opt_output_path;
static char *data_buffer;
static unsigned int data_buffer_size;
-/* Global hash table that stores relay index object. */
-static struct lttng_ht *indexes_ht;
-
/* We need those values for the file/dir creation. */
static uid_t relayd_uid;
static gid_t relayd_gid;
/* Global relay stream hash table. */
struct lttng_ht *relay_streams_ht;
+/* Global relay viewer stream hash table. */
+struct lttng_ht *viewer_streams_ht;
+
+/* Global hash table that stores relay index object. */
+struct lttng_ht *indexes_ht;
+
+/* Relayd health monitoring */
+struct health_app *health_relayd;
+
/*
* usage function on stderr
*/
DBG("[thread] Relay listener started");
+ health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
+
+ health_code_update();
+
control_sock = relay_init_sock(control_uri);
if (!control_sock) {
goto error_sock_control;
}
while (1) {
+ health_code_update();
+
DBG("Listener accepting connections");
restart:
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
if (ret < 0) {
/*
* Restart interrupted system call.
DBG("Relay new connection received");
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
lttcomm_destroy_sock(control_sock);
error_sock_control:
if (err) {
- DBG("Thread exited with error");
+ health_error();
+ ERR("Health error occurred in %s", __func__);
}
+ health_unregister(health_relayd);
DBG("Relay listener thread cleanup complete");
stop_threads();
return NULL;
static
void *relay_thread_dispatcher(void *data)
{
- int ret;
+ int ret, err = -1;
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
DBG("[thread] Relay dispatcher started");
+ health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
+
+ health_code_update();
+
while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
+ health_code_update();
+
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_cmd_queue.futex);
do {
+ health_code_update();
+
/* Dequeue commands */
node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
if (node == NULL) {
} while (node != NULL);
/* Futex wait on queue. Blocking call on futex() */
+ health_poll_entry();
futex_nto1_wait(&relay_cmd_queue.futex);
+ health_poll_exit();
}
+ /* Normal exit, no error */
+ err = 0;
+
error:
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_relayd);
DBG("Dispatch thread dying");
stop_threads();
return NULL;
free(session);
}
+/*
+ * Close a given stream. The stream is freed using a call RCU.
+ *
+ * RCU read side lock MUST be acquired. If NO close_stream_check() was called
+ * BEFORE the stream lock MUST be acquired.
+ */
+static void destroy_stream(struct relay_stream *stream,
+ struct lttng_ht *ctf_traces_ht)
+{
+ int delret;
+ struct relay_viewer_stream *vstream;
+ struct lttng_ht_iter iter;
+
+ assert(stream);
+
+ delret = close(stream->fd);
+ if (delret < 0) {
+ PERROR("close stream");
+ }
+
+ if (stream->index_fd >= 0) {
+ delret = close(stream->index_fd);
+ if (delret < 0) {
+ PERROR("close stream index_fd");
+ }
+ }
+
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle);
+ if (vstream) {
+ /*
+ * Set the last good value into the viewer stream. This is done
+ * right before the stream gets deleted from the hash table. The
+ * lookup failure on the live thread side of a stream indicates
+ * that the viewer stream index received value should be used.
+ */
+ vstream->total_index_received = stream->total_index_received;
+ }
+
+ /* Cleanup index of that stream. */
+ relay_index_destroy_by_stream_id(stream->stream_handle);
+
+ iter.iter.node = &stream->stream_n.node;
+ delret = lttng_ht_del(relay_streams_ht, &iter);
+ assert(!delret);
+ iter.iter.node = &stream->ctf_trace_node.node;
+ delret = lttng_ht_del(ctf_traces_ht, &iter);
+ assert(!delret);
+ call_rcu(&stream->rcu_node, deferred_free_stream);
+ DBG("Closed tracefile %d from close stream", stream->fd);
+}
+
/*
* relay_delete_session: Free all memory associated with a session and
* close all the FDs
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) {
node = lttng_ht_iter_get_node_ulong(&iter);
- if (node) {
- stream = caa_container_of(node,
- struct relay_stream, stream_n);
- if (stream->session == cmd->session) {
- ret = close(stream->fd);
- if (ret < 0) {
- PERROR("close stream fd on delete session");
- }
- ret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!ret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- }
- /* Cleanup index of that stream. */
- relay_index_destroy_by_stream_id(stream->stream_handle,
- indexes_ht);
+ if (!node) {
+ continue;
+ }
+ stream = caa_container_of(node, struct relay_stream, stream_n);
+ if (stream->session == cmd->session) {
+ destroy_stream(stream, cmd->ctf_traces_ht);
}
}
+
+ /* Make this session not visible anymore. */
iter.iter.node = &cmd->session->session_n.node;
ret = lttng_ht_del(sessions_ht, &iter);
assert(!ret);
- call_rcu(&cmd->session->rcu_node,
- deferred_free_session);
+ call_rcu(&cmd->session->rcu_node, deferred_free_session);
rcu_read_unlock();
}
session->id = ++last_relay_session_id;
session->sock = cmd->sock;
+ session->minor = cmd->minor;
+ session->major = cmd->major;
cmd->session = session;
reply.session_id = htobe64(session->id);
*/
static
int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *viewer_streams_ht)
+ struct relay_command *cmd)
{
+ int ret, send_ret;
struct relay_session *session = cmd->session;
struct lttcomm_relayd_close_stream stream_info;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
- int ret, send_ret;
- struct lttng_ht_iter iter;
DBG("Close stream received");
stream->close_flag = 1;
if (close_stream_check(stream)) {
- int delret;
- struct relay_viewer_stream *vstream;
-
- delret = close(stream->fd);
- if (delret < 0) {
- PERROR("close stream");
- }
-
- if (stream->index_fd >= 0) {
- delret = close(stream->index_fd);
- if (delret < 0) {
- PERROR("close stream index_fd");
- }
- }
-
- vstream = live_find_viewer_stream_by_id(stream->stream_handle,
- viewer_streams_ht);
- if (vstream) {
- /*
- * Set the last good value into the viewer stream. This is done
- * right before the stream gets deleted from the hash table. The
- * lookup failure on the live thread side of a stream indicates
- * that the viewer stream index received value should be used.
- */
- vstream->total_index_received = stream->total_index_received;
- }
-
- iter.iter.node = &stream->stream_n.node;
- delret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!delret);
- iter.iter.node = &stream->ctf_trace_node.node;
- delret = lttng_ht_del(cmd->ctf_traces_ht, &iter);
- assert(!delret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- DBG("Closed tracefile %d from close stream", stream->fd);
+ destroy_stream(stream, cmd->ctf_traces_ht);
}
end_unlock:
*/
static
int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *indexes_ht)
+ struct relay_command *cmd)
{
int ret, send_ret, index_created = 0;
struct relay_session *session = cmd->session;
uint64_t net_seq_num;
assert(cmd);
- assert(indexes_ht);
DBG("Relay receiving index");
stream->beacon_ts_end = -1ULL;
}
- index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht);
+ index = relay_index_find(stream->stream_handle, net_seq_num);
if (!index) {
/* A successful creation will add the object to the HT. */
index = relay_index_create(stream->stream_handle, net_seq_num);
* already exist, destroy back the index created, set the data in this
* object and write it on disk.
*/
- relay_index_add(index, indexes_ht, &wr_index);
+ relay_index_add(index, &wr_index);
if (wr_index) {
copy_index_control_data(wr_index, &index_info);
free(index);
stream->index_fd = ret;
}
- ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+ ret = relay_index_write(wr_index->fd, wr_index);
if (ret < 0) {
goto end_rcu_unlock;
}
ret = relay_send_version(recv_hdr, cmd, ctx->sessions_ht);
break;
case RELAYD_CLOSE_STREAM:
- ret = relay_close_stream(recv_hdr, cmd, ctx->viewer_streams_ht);
+ ret = relay_close_stream(recv_hdr, cmd);
break;
case RELAYD_DATA_PENDING:
ret = relay_data_pending(recv_hdr, cmd);
ret = relay_end_data_pending(recv_hdr, cmd);
break;
case RELAYD_SEND_INDEX:
- ret = relay_recv_index(recv_hdr, cmd, indexes_ht);
+ ret = relay_recv_index(recv_hdr, cmd);
break;
case RELAYD_UPDATE_SYNC_INFO:
default:
return ret;
}
+/*
+ * Handle index for a data stream.
+ *
+ * RCU read side lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
+ int rotate_index)
+{
+ int ret = 0, index_created = 0;
+ uint64_t stream_id, data_offset;
+ struct relay_index *index, *wr_index = NULL;
+
+ assert(stream);
+
+ stream_id = stream->stream_handle;
+ /* Get data offset because we are about to update the index. */
+ data_offset = htobe64(stream->tracefile_size_current);
+
+ /*
+ * Lookup for an existing index for that stream id/sequence number. If on
+ * exists, the control thread already received the data for it thus we need
+ * to write it on disk.
+ */
+ index = relay_index_find(stream_id, net_seq_num);
+ if (!index) {
+ /* A successful creation will add the object to the HT. */
+ index = relay_index_create(stream_id, net_seq_num);
+ if (!index) {
+ ret = -1;
+ goto error;
+ }
+ index_created = 1;
+ }
+
+ if (rotate_index || stream->index_fd < 0) {
+ index->to_close_fd = stream->index_fd;
+ ret = index_create_file(stream->path_name, stream->channel_name,
+ relayd_uid, relayd_gid, stream->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ /* This will close the stream's index fd if one. */
+ relay_index_free_safe(index);
+ goto error;
+ }
+ stream->index_fd = ret;
+ }
+ index->fd = stream->index_fd;
+ index->index_data.offset = data_offset;
+
+ if (index_created) {
+ /*
+ * Try to add the relay index object to the hash table. If an object
+ * already exist, destroy back the index created and set the data.
+ */
+ relay_index_add(index, &wr_index);
+ if (wr_index) {
+ /* Copy back data from the created index. */
+ wr_index->fd = index->fd;
+ wr_index->to_close_fd = index->to_close_fd;
+ wr_index->index_data.offset = data_offset;
+ free(index);
+ }
+ } else {
+ /* The index already exists so write it on disk. */
+ wr_index = index;
+ }
+
+ /* Do we have a writable ready index to write on disk. */
+ if (wr_index) {
+ ret = relay_index_write(wr_index->fd, wr_index);
+ if (ret < 0) {
+ goto error;
+ }
+ stream->total_index_received++;
+ }
+
+error:
+ return ret;
+}
+
/*
* relay_process_data: Process the data received on the data socket
*/
static
-int relay_process_data(struct relay_command *cmd,
- struct lttng_ht *indexes_ht)
+int relay_process_data(struct relay_command *cmd)
{
- int ret = 0, rotate_index = 0, index_created = 0;
+ int ret = 0, rotate_index = 0;
struct relay_stream *stream;
- struct relay_index *index, *wr_index = NULL;
struct lttcomm_relayd_data_hdr data_hdr;
- uint64_t stream_id, data_offset;
+ uint64_t stream_id;
uint64_t net_seq_num;
uint32_t data_size;
rotate_index = 1;
}
- /* Get data offset because we are about to update the index. */
- data_offset = htobe64(stream->tracefile_size_current);
-
/*
- * Lookup for an existing index for that stream id/sequence number. If on
- * exists, the control thread already received the data for it thus we need
- * to write it on disk.
+ * Index are handled in protocol version 2.4 and above. Also, snapshot and
+ * index are NOT supported.
*/
- index = relay_index_find(stream_id, net_seq_num, indexes_ht);
- if (!index) {
- /* A successful creation will add the object to the HT. */
- index = relay_index_create(stream->stream_handle, net_seq_num);
- if (!index) {
- goto end_rcu_unlock;
- }
- index_created = 1;
- }
-
- if (rotate_index || stream->index_fd < 0) {
- index->to_close_fd = stream->index_fd;
- ret = index_create_file(stream->path_name, stream->channel_name,
- relayd_uid, relayd_gid, stream->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- /* This will close the stream's index fd if one. */
- relay_index_free_safe(index);
- goto end_rcu_unlock;
- }
- stream->index_fd = ret;
- }
- index->fd = stream->index_fd;
- index->index_data.offset = data_offset;
-
- if (index_created) {
- /*
- * Try to add the relay index object to the hash table. If an object
- * already exist, destroy back the index created and set the data.
- */
- relay_index_add(index, indexes_ht, &wr_index);
- if (wr_index) {
- /* Copy back data from the created index. */
- wr_index->fd = index->fd;
- wr_index->to_close_fd = index->to_close_fd;
- wr_index->index_data.offset = data_offset;
- free(index);
- }
- } else {
- /* The index already exists so write it on disk. */
- wr_index = index;
- }
-
- /* Do we have a writable ready index to write on disk. */
- if (wr_index) {
- /* Starting at 2.4, create the index file if none available. */
- if (cmd->minor >= 4 && stream->index_fd < 0) {
- ret = index_create_file(stream->path_name, stream->channel_name,
- relayd_uid, relayd_gid, stream->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- goto end_rcu_unlock;
- }
- stream->index_fd = ret;
- }
-
- ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+ if (stream->session->minor >= 4 && !stream->session->snapshot) {
+ ret = handle_index_data(stream, net_seq_num, rotate_index);
if (ret < 0) {
goto end_rcu_unlock;
}
- stream->total_index_received++;
}
+ /* Write data to stream output fd. */
do {
ret = write(stream->fd, data_buffer, data_size);
} while (ret < 0 && errno == EINTR);
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
- int cret;
- struct lttng_ht_iter iter;
-
- cret = close(stream->fd);
- if (cret < 0) {
- PERROR("close stream process data");
- }
-
- cret = close(stream->index_fd);
- if (cret < 0) {
- PERROR("close stream index_fd");
- }
- iter.iter.node = &stream->stream_n.node;
- ret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!ret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- DBG("Closed tracefile %d after recv data", stream->fd);
+ destroy_stream(stream, cmd->ctf_traces_ht);
}
end_rcu_unlock:
rcu_register_thread();
+ health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
+
+ health_code_update();
+
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
while (1) {
int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+ health_code_update();
+
/* Infinite blocking call, waiting for transmission */
DBG3("Relayd worker thread polling...");
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
if (ret < 0) {
/*
* Restart interrupted system call.
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ health_code_update();
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
if (last_seen_data_fd >= 0) {
for (i = 0; i < nb_fd; i++) {
int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ health_code_update();
+
if (last_seen_data_fd == pollfd) {
idx = i;
break;
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ health_code_update();
+
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_cmd_pipe[0]) {
continue;
continue;
}
- ret = relay_process_data(relay_connection, indexes_ht);
+ ret = relay_process_data(relay_connection);
/* connection closed */
if (ret < 0) {
relay_cleanup_poll_connection(&events, pollfd);
last_seen_data_fd = -1;
}
+ /* Normal exit, no error */
+ ret = 0;
+
exit:
error:
lttng_poll_clean(&events);
/* empty the hash table and free the memory */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
+ health_code_update();
+
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
relay_connection = caa_container_of(node,
}
DBG("Worker thread cleanup complete");
free(data_buffer);
- stop_threads();
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_relayd);
rcu_unregister_thread();
+ stop_threads();
return NULL;
}
}
/* tables of streams indexed by stream ID */
- relay_ctx->viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!relay_ctx->viewer_streams_ht) {
+ viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!viewer_streams_ht) {
goto exit_relay_ctx_viewer_streams;
}
+ /* Initialize thread health monitoring */
+ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
+ if (!health_relayd) {
+ PERROR("health_app_create error");
+ goto exit_health_app_create;
+ }
+
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
goto exit_listener;
}
- ret = live_start_threads(live_uri, relay_ctx);
+ ret = live_start_threads(live_uri, relay_ctx, thread_quit_pipe);
if (ret != 0) {
ERR("Starting live viewer threads");
+ goto exit_live;
}
-exit_listener:
+ live_stop_threads();
+
+exit_live:
ret = pthread_join(listener_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
-exit_worker:
+exit_listener:
ret = pthread_join(worker_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
-exit_dispatcher:
+exit_worker:
ret = pthread_join(dispatcher_thread, &status);
if (ret != 0) {
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
- lttng_ht_destroy(relay_ctx->viewer_streams_ht);
+
+exit_dispatcher:
+ health_app_destroy(health_relayd);
+
+exit_health_app_create:
+ lttng_ht_destroy(viewer_streams_ht);
exit_relay_ctx_viewer_streams:
lttng_ht_destroy(relay_streams_ht);
free(relay_ctx);
exit:
- live_stop_threads();
cleanup();
if (!ret) {
exit(EXIT_SUCCESS);