#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <inttypes.h>
#include <sys/mman.h>
#include <sys/mount.h>
#include <sys/resource.h>
#include <common/utils.h>
#include "lttng-sessiond.h"
+#include "buffer-registry.h"
#include "channel.h"
#include "cmd.h"
#include "consumer.h"
.cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH,
.err_sock = -1,
.cmd_sock = -1,
+ .metadata_sock.fd = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
.cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH,
.err_sock = -1,
.cmd_sock = -1,
+ .metadata_sock.fd = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
.cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH,
.err_sock = -1,
.cmd_sock = -1,
+ .metadata_sock.fd = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
static pthread_t kernel_thread;
static pthread_t dispatch_thread;
static pthread_t health_thread;
+static pthread_t ht_cleanup_thread;
/*
* UST registration command queue. This queue is tied with a futex and uses a N
*/
static int app_socket_timeout;
+/* Set in main() with the current page size. */
+long page_size;
+
static
void setup_consumerd_path(void)
{
DBG("Closing all UST sockets");
ust_app_clean_list();
+ buffer_reg_destroy_registries();
if (is_root && !opt_no_kernel) {
DBG2("Closing kernel fd");
/*
* This first step of the while is to clean this structure which could free
- * non NULL pointers so zero it before the loop.
+ * non NULL pointers so initialize it before the loop.
*/
- memset(&events, 0, sizeof(events));
+ lttng_poll_init(&events);
if (testpoint(thread_manage_kernel)) {
goto error_testpoint;
health_code_update();
/*
- * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock.
- * Nothing more will be added to this poll set.
+ * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
+ * metadata_sock. Nothing more will be added to this poll set.
*/
- ret = sessiond_set_thread_pollset(&events, 2);
+ ret = sessiond_set_thread_pollset(&events, 3);
if (ret < 0) {
goto error_poll;
}
health_code_update();
- /* Inifinite blocking call, waiting for transmission */
+ /* Infinite blocking call, waiting for transmission */
restart:
health_poll_entry();
health_code_update();
if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
+ /* Connect both socket, command and metadata. */
consumer_data->cmd_sock =
lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
- if (consumer_data->cmd_sock < 0) {
+ consumer_data->metadata_sock.fd =
+ lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
+ if (consumer_data->cmd_sock < 0 ||
+ consumer_data->metadata_sock.fd < 0) {
+ PERROR("consumer connect cmd socket");
/* On error, signal condition and quit. */
signal_consumer_condition(consumer_data, -1);
- PERROR("consumer connect");
goto error;
}
+ /* Create metadata socket lock. */
+ consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
+ if (consumer_data->metadata_sock.lock == NULL) {
+ PERROR("zmalloc pthread mutex");
+ ret = -1;
+ goto error;
+ }
+ pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
+
signal_consumer_condition(consumer_data, 1);
- DBG("Consumer command socket ready");
+ DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
+ DBG("Consumer metadata socket ready (fd: %d)",
+ consumer_data->metadata_sock.fd);
} else {
ERR("consumer error when waiting for SOCK_READY : %s",
lttcomm_get_readable_code(-code));
goto error;
}
- /* Remove the kconsumerd error sock since we've established a connexion */
+ /* Remove the consumerd error sock since we've established a connexion */
ret = lttng_poll_del(&events, consumer_data->err_sock);
if (ret < 0) {
goto error;
}
+ /* Add new accepted error socket. */
ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
if (ret < 0) {
goto error;
}
+ /* Add metadata socket that is successfully connected. */
+ ret = lttng_poll_add(&events, consumer_data->metadata_sock.fd,
+ LPOLLIN | LPOLLRDHUP);
+ if (ret < 0) {
+ goto error;
+ }
+
health_code_update();
- /* Inifinite blocking call, waiting for transmission */
+ /* Infinite blocking call, waiting for transmission */
restart_poll:
- health_poll_entry();
- ret = lttng_poll_wait(&events, -1);
- health_poll_exit();
- if (ret < 0) {
- /*
- * Restart interrupted system call.
- */
- if (errno == EINTR) {
- goto restart_poll;
+ while (1) {
+ health_poll_entry();
+ ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
+ if (ret < 0) {
+ /*
+ * Restart interrupted system call.
+ */
+ if (errno == EINTR) {
+ goto restart_poll;
+ }
+ goto error;
}
- goto error;
- }
- nb_fd = ret;
+ nb_fd = ret;
- for (i = 0; i < nb_fd; i++) {
- /* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
+ for (i = 0; i < nb_fd; i++) {
+ /* Fetch once the poll data */
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
- health_code_update();
+ health_code_update();
- /* Thread quit pipe has been closed. Killing thread. */
- ret = sessiond_check_thread_quit_pipe(pollfd, revents);
- if (ret) {
- err = 0;
- goto exit;
- }
+ /* Thread quit pipe has been closed. Killing thread. */
+ ret = sessiond_check_thread_quit_pipe(pollfd, revents);
+ if (ret) {
+ err = 0;
+ goto exit;
+ }
- /* Event on the kconsumerd socket */
- if (pollfd == sock) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("consumer err socket second poll error");
+ if (pollfd == sock) {
+ /* Event on the consumerd socket */
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("consumer err socket second poll error");
+ goto error;
+ }
+ health_code_update();
+ /* Wait for any kconsumerd error */
+ ret = lttcomm_recv_unix_sock(sock, &code,
+ sizeof(enum lttcomm_return_code));
+ if (ret <= 0) {
+ ERR("consumer closed the command socket");
+ goto error;
+ }
+
+ ERR("consumer return code : %s",
+ lttcomm_get_readable_code(-code));
+
+ goto exit;
+ } else if (pollfd == consumer_data->metadata_sock.fd) {
+ /* UST metadata requests */
+ ret = ust_consumer_metadata_request(
+ &consumer_data->metadata_sock);
+ if (ret < 0) {
+ ERR("Handling metadata request");
+ goto error;
+ }
+ break;
+ } else {
+ ERR("Unknown pollfd");
goto error;
}
}
+ health_code_update();
}
- health_code_update();
-
- /* Wait for any kconsumerd error */
- ret = lttcomm_recv_unix_sock(sock, &code,
- sizeof(enum lttcomm_return_code));
- if (ret <= 0) {
- ERR("consumer closed the command socket");
- goto error;
- }
-
- ERR("consumer return code : %s", lttcomm_get_readable_code(-code));
-
exit:
error:
/* Immediately set the consumerd state to stopped */
PERROR("close");
}
}
+ if (consumer_data->metadata_sock.fd >= 0) {
+ ret = close(consumer_data->metadata_sock.fd);
+ if (ret) {
+ PERROR("close");
+ }
+ }
+ /* Cleanup metadata socket mutex. */
+ pthread_mutex_destroy(consumer_data->metadata_sock.lock);
+ free(consumer_data->metadata_sock.lock);
+
if (sock >= 0) {
ret = close(sock);
if (ret) {
return ret;
}
+/*
+ * Sanitize the wait queue of the dispatch registration thread meaning removing
+ * invalid nodes from it. This is to avoid memory leaks for the case the UST
+ * notify socket is never received.
+ */
+static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
+{
+ int ret, nb_fd = 0, i;
+ unsigned int fd_added = 0;
+ struct lttng_poll_event events;
+ struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
+
+ assert(wait_queue);
+
+ lttng_poll_init(&events);
+
+ /* Just skip everything for an empty queue. */
+ if (!wait_queue->count) {
+ goto end;
+ }
+
+ ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
+ if (ret < 0) {
+ goto error_create;
+ }
+
+ cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
+ &wait_queue->head, head) {
+ assert(wait_node->app);
+ ret = lttng_poll_add(&events, wait_node->app->sock,
+ LPOLLHUP | LPOLLERR);
+ if (ret < 0) {
+ goto error;
+ }
+
+ fd_added = 1;
+ }
+
+ if (!fd_added) {
+ goto end;
+ }
+
+ /*
+ * Poll but don't block so we can quickly identify the faulty events and
+ * clean them afterwards from the wait queue.
+ */
+ ret = lttng_poll_wait(&events, 0);
+ if (ret < 0) {
+ goto error;
+ }
+ nb_fd = ret;
+
+ for (i = 0; i < nb_fd; i++) {
+ /* Get faulty FD. */
+ uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+ int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
+ &wait_queue->head, head) {
+ if (pollfd == wait_node->app->sock &&
+ (revents & (LPOLLHUP | LPOLLERR))) {
+ cds_list_del(&wait_node->head);
+ wait_queue->count--;
+ ust_app_destroy(wait_node->app);
+ free(wait_node);
+ break;
+ }
+ }
+ }
+
+ if (nb_fd > 0) {
+ DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
+ }
+
+end:
+ lttng_poll_clean(&events);
+ return;
+
+error:
+ lttng_poll_clean(&events);
+error_create:
+ ERR("Unable to sanitize wait queue");
+ return;
+}
+
/*
* Dispatch request from the registration threads to the application
* communication thread.
*/
static void *thread_dispatch_ust_registration(void *data)
{
- int ret;
+ int ret, err = -1;
struct cds_wfq_node *node;
struct ust_command *ust_cmd = NULL;
- struct {
- struct ust_app *app;
- struct cds_list_head head;
- } *wait_node = NULL, *tmp_wait_node;
+ struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
+ struct ust_reg_wait_queue wait_queue = {
+ .count = 0,
+ };
+
+ health_register(HEALTH_TYPE_APP_REG_DISPATCH);
- CDS_LIST_HEAD(wait_queue);
+ health_code_update();
+
+ CDS_INIT_LIST_HEAD(&wait_queue.head);
DBG("[thread] Dispatch UST command started");
while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
+ health_code_update();
+
/* Atomically prepare the queue futex */
futex_nto1_prepare(&ust_cmd_queue.futex);
do {
struct ust_app *app = NULL;
+ ust_cmd = NULL;
+
+ /*
+ * Make sure we don't have node(s) that have hung up before receiving
+ * the notify socket. This is to clean the list in order to avoid
+ * memory leaks from notify socket that are never seen.
+ */
+ sanitize_wait_queue(&wait_queue);
+ health_code_update();
/* Dequeue command for registration */
node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue);
if (node == NULL) {
wait_node = zmalloc(sizeof(*wait_node));
if (!wait_node) {
PERROR("zmalloc wait_node dispatch");
+ ret = close(ust_cmd->sock);
+ if (ret < 0) {
+ PERROR("close ust sock dispatch %d", ust_cmd->sock);
+ }
+ lttng_fd_put(1, LTTNG_FD_APPS);
+ free(ust_cmd);
goto error;
}
CDS_INIT_LIST_HEAD(&wait_node->head);
}
lttng_fd_put(1, LTTNG_FD_APPS);
free(wait_node);
+ free(ust_cmd);
continue;
}
/*
* Add application to the wait queue so we can set the notify
* socket before putting this object in the global ht.
*/
- cds_list_add(&wait_node->head, &wait_queue);
+ cds_list_add(&wait_node->head, &wait_queue.head);
+ wait_queue.count++;
+ free(ust_cmd);
/*
* We have to continue here since we don't have the notify
* socket and the application MUST be added to the hash table
* notify socket if found.
*/
cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
- &wait_queue, head) {
+ &wait_queue.head, head) {
+ health_code_update();
if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
wait_node->app->notify_sock = ust_cmd->sock;
cds_list_del(&wait_node->head);
+ wait_queue.count--;
app = wait_node->app;
free(wait_node);
DBG3("UST app notify socket %d is set", ust_cmd->sock);
break;
}
}
+
+ /*
+ * With no application at this stage the received socket is
+ * basically useless so close it before we free the cmd data
+ * structure for good.
+ */
+ if (!app) {
+ ret = close(ust_cmd->sock);
+ if (ret < 0) {
+ PERROR("close ust sock dispatch %d", ust_cmd->sock);
+ }
+ lttng_fd_put(1, LTTNG_FD_APPS);
+ }
+ free(ust_cmd);
}
if (app) {
rcu_read_unlock();
session_unlock_list();
- } else {
- /* Application manager threads are not available. */
- ret = close(ust_cmd->sock);
- if (ret < 0) {
- PERROR("close ust_cmd sock");
- }
- lttng_fd_put(1, LTTNG_FD_APPS);
}
- free(ust_cmd);
} while (node != NULL);
+ health_poll_entry();
/* Futex wait on queue. Blocking call on futex() */
futex_nto1_wait(&ust_cmd_queue.futex);
+ health_poll_exit();
}
+ /* Normal exit, no error */
+ err = 0;
error:
/* Clean up wait queue. */
cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
- &wait_queue, head) {
+ &wait_queue.head, head) {
cds_list_del(&wait_node->head);
+ wait_queue.count--;
free(wait_node);
}
DBG("Dispatch thread dying");
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister();
return NULL;
}
ret = putenv(tmpnew);
if (ret) {
ret = -errno;
+ free(tmpnew);
goto error;
}
}
ret = putenv(tmpnew);
if (ret) {
ret = -errno;
+ free(tmpnew);
goto error;
}
}
return 0;
error:
- /* Cleanup already created socket on error. */
+ /* Cleanup already created sockets on error. */
if (consumer_data->err_sock >= 0) {
int err;
* Copy consumer output from the tracing session to the domain session. The
* function also applies the right modification on a per domain basis for the
* trace files destination directory.
+ *
+ * Should *NOT* be called with RCU read-side lock held.
*/
static int copy_session_consumer(int domain, struct ltt_session *session)
{
/*
* Create an UST session and add it to the session ust list.
+ *
+ * Should *NOT* be called with RCU read-side lock held.
*/
static int create_ust_session(struct ltt_session *session,
struct lttng_domain *domain)
DBG("Creating UST session");
- lus = trace_ust_create_session(session->path, session->id);
+ lus = trace_ust_create_session(session->id);
if (lus == NULL) {
ret = LTTNG_ERR_UST_SESS_FAIL;
goto error;
* Return any error encountered or 0 for success.
*
* "sock" is only used for special-case var. len data.
+ *
+ * Should *NOT* be called with RCU read-side lock held.
*/
static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
int *sock_error)
}
case LTTNG_ENABLE_CHANNEL:
{
- ret = cmd_enable_channel(cmd_ctx->session, cmd_ctx->lsm->domain.type,
+ ret = cmd_enable_channel(cmd_ctx->session, &cmd_ctx->lsm->domain,
&cmd_ctx->lsm->u.channel.chan, kernel_poll_pipe[1]);
break;
}
case LTTNG_ENABLE_EVENT:
{
- ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type,
+ ret = cmd_enable_event(cmd_ctx->session, &cmd_ctx->lsm->domain,
cmd_ctx->lsm->u.enable.channel_name,
&cmd_ctx->lsm->u.enable.event, NULL, kernel_poll_pipe[1]);
break;
{
DBG("Enabling all events");
- ret = cmd_enable_event_all(cmd_ctx->session, cmd_ctx->lsm->domain.type,
+ ret = cmd_enable_event_all(cmd_ctx->session, &cmd_ctx->lsm->domain,
cmd_ctx->lsm->u.enable.channel_name,
cmd_ctx->lsm->u.enable.event.type, NULL, kernel_poll_pipe[1]);
break;
ret = setup_lttng_msg(cmd_ctx, nb_dom * sizeof(struct lttng_domain));
if (ret < 0) {
+ free(domains);
goto setup_error;
}
ret = setup_lttng_msg(cmd_ctx, nb_chan * sizeof(struct lttng_channel));
if (ret < 0) {
+ free(channels);
goto setup_error;
}
ret = setup_lttng_msg(cmd_ctx, nb_event * sizeof(struct lttng_event));
if (ret < 0) {
+ free(events);
goto setup_error;
}
goto error;
}
- ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type,
+ ret = cmd_enable_event(cmd_ctx->session, &cmd_ctx->lsm->domain,
cmd_ctx->lsm->u.enable.channel_name,
&cmd_ctx->lsm->u.enable.event, bytecode, kernel_poll_pipe[1]);
break;
rcu_register_thread();
+ /* We might hit an error path before this is created. */
+ lttng_poll_init(&events);
+
/* Create unix socket */
sock = lttcomm_create_unix_sock(health_unix_sock_path);
if (sock < 0) {
case LTTNG_HEALTH_CONSUMER:
reply.ret_code = check_consumer_health();
break;
+ case LTTNG_HEALTH_HT_CLEANUP:
+ reply.ret_code = health_check_state(HEALTH_TYPE_HT_CLEANUP);
+ break;
+ case LTTNG_HEALTH_APP_MANAGE_NOTIFY:
+ reply.ret_code = health_check_state(HEALTH_TYPE_APP_MANAGE_NOTIFY);
+ break;
+ case LTTNG_HEALTH_APP_REG_DISPATCH:
+ reply.ret_code = health_check_state(HEALTH_TYPE_APP_REG_DISPATCH);
+ break;
case LTTNG_HEALTH_ALL:
reply.ret_code =
health_check_state(HEALTH_TYPE_APP_MANAGE) &&
health_check_state(HEALTH_TYPE_APP_REG) &&
health_check_state(HEALTH_TYPE_CMD) &&
health_check_state(HEALTH_TYPE_KERNEL) &&
- check_consumer_health();
+ check_consumer_health() &&
+ health_check_state(HEALTH_TYPE_HT_CLEANUP) &&
+ health_check_state(HEALTH_TYPE_APP_MANAGE_NOTIFY) &&
+ health_check_state(HEALTH_TYPE_APP_REG_DISPATCH);
break;
default:
reply.ret_code = LTTNG_ERR_UND;
PERROR("close");
}
}
- if (new_sock >= 0) {
- ret = close(new_sock);
- if (ret) {
- PERROR("close");
- }
- }
lttng_poll_clean(&events);
ret = process_client_msg(cmd_ctx, sock, &sock_error);
rcu_thread_offline();
if (ret < 0) {
- if (sock_error) {
- ret = close(sock);
- if (ret) {
- PERROR("close");
- }
- sock = -1;
+ ret = close(sock);
+ if (ret) {
+ PERROR("close");
}
+ sock = -1;
/*
* TODO: Inform client somehow of the fatal error. At
* this point, ret < 0 means that a zmalloc failed
setup_consumerd_path();
+ page_size = sysconf(_SC_PAGESIZE);
+ if (page_size < 0) {
+ PERROR("sysconf _SC_PAGESIZE");
+ page_size = LONG_MAX;
+ WARN("Fallback page size to %ld", page_size);
+ }
+
/* Parse arguments */
progname = argv[0];
if ((ret = parse_args(argc, argv)) < 0) {
}
}
+ /* Setup the thread ht_cleanup communication pipe. */
+ if (utils_create_pipe_cloexec(ht_cleanup_pipe) < 0) {
+ goto exit;
+ }
+
/* Setup the thread apps communication pipe. */
if ((ret = utils_create_pipe_cloexec(apps_cmd_pipe)) < 0) {
goto exit;
goto exit;
}
+ /* Initialize global buffer per UID and PID registry. */
+ buffer_reg_init_uid_registry();
+ buffer_reg_init_pid_registry();
+
/* Init UST command queue. */
cds_wfq_init(&ust_cmd_queue.queue);
write_pidfile();
+ /* Create thread to manage the client socket */
+ ret = pthread_create(&ht_cleanup_thread, NULL,
+ thread_ht_cleanup, (void *) NULL);
+ if (ret != 0) {
+ PERROR("pthread_create ht_cleanup");
+ goto exit_ht_cleanup;
+ }
+
/* Create thread to manage the client socket */
ret = pthread_create(&health_thread, NULL,
thread_manage_health, (void *) NULL);
}
exit_health:
+ ret = pthread_join(ht_cleanup_thread, &status);
+ if (ret != 0) {
+ PERROR("pthread_join ht cleanup thread");
+ goto error; /* join error, exit without cleanup */
+ }
+exit_ht_cleanup:
exit:
/*
* cleanup() is called when no other thread is running.