Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
+ health_code_update();
+
control_sock = relay_init_sock(control_uri);
if (!control_sock) {
goto error_sock_control;
control_sock = relay_init_sock(control_uri);
if (!control_sock) {
goto error_sock_control;
+ health_code_update();
+
DBG("Listener accepting connections");
restart:
DBG("Listener accepting connections");
restart:
ret = lttng_poll_wait(&events, -1);
ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
/*
* Restart interrupted system call.
if (ret < 0) {
/*
* Restart interrupted system call.
DBG("Relay new connection received");
for (i = 0; i < nb_fd; i++) {
DBG("Relay new connection received");
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
lttcomm_destroy_sock(control_sock);
error_sock_control:
if (err) {
lttcomm_destroy_sock(control_sock);
error_sock_control:
if (err) {
- DBG("Thread exited with error");
+ health_error();
+ ERR("Health error occurred in %s", __func__);
}
health_unregister(health_relayd);
DBG("Relay listener thread cleanup complete");
}
health_unregister(health_relayd);
DBG("Relay listener thread cleanup complete");
static
void *relay_thread_dispatcher(void *data)
{
static
void *relay_thread_dispatcher(void *data)
{
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
+ health_code_update();
+
while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
+ health_code_update();
+
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_cmd_queue.futex);
do {
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_cmd_queue.futex);
do {
+ health_code_update();
+
/* Dequeue commands */
node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
if (node == NULL) {
/* Dequeue commands */
node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
if (node == NULL) {
} while (node != NULL);
/* Futex wait on queue. Blocking call on futex() */
} while (node != NULL);
/* Futex wait on queue. Blocking call on futex() */
futex_nto1_wait(&relay_cmd_queue.futex);
futex_nto1_wait(&relay_cmd_queue.futex);
+ /* Normal exit, no error */
+ err = 0;
+
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
health_unregister(health_relayd);
DBG("Dispatch thread dying");
stop_threads();
health_unregister(health_relayd);
DBG("Dispatch thread dying");
stop_threads();
health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
+ health_code_update();
+
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
while (1) {
int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
while (1) {
int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+ health_code_update();
+
/* Infinite blocking call, waiting for transmission */
DBG3("Relayd worker thread polling...");
/* Infinite blocking call, waiting for transmission */
DBG3("Relayd worker thread polling...");
ret = lttng_poll_wait(&events, -1);
ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
/*
* Restart interrupted system call.
if (ret < 0) {
/*
* Restart interrupted system call.
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ health_code_update();
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
if (last_seen_data_fd >= 0) {
for (i = 0; i < nb_fd; i++) {
int pollfd = LTTNG_POLL_GETFD(&events, i);
if (last_seen_data_fd >= 0) {
for (i = 0; i < nb_fd; i++) {
int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ health_code_update();
+
if (last_seen_data_fd == pollfd) {
idx = i;
break;
if (last_seen_data_fd == pollfd) {
idx = i;
break;
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ health_code_update();
+
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_cmd_pipe[0]) {
continue;
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_cmd_pipe[0]) {
continue;
last_seen_data_fd = -1;
}
last_seen_data_fd = -1;
}
+ /* Normal exit, no error */
+ ret = 0;
+
exit:
error:
lttng_poll_clean(&events);
exit:
error:
lttng_poll_clean(&events);
/* empty the hash table and free the memory */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
/* empty the hash table and free the memory */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
+ health_code_update();
+
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
relay_connection = caa_container_of(node,
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
relay_connection = caa_container_of(node,
if (err) {
DBG("Thread exited with error");
}
if (err) {
DBG("Thread exited with error");
}
- health_unregister(health_relayd);
DBG("Worker thread cleanup complete");
free(data_buffer);
DBG("Worker thread cleanup complete");
free(data_buffer);
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_relayd);