return ret;
}
+/*
+ * Wait on consumer process termination.
+ *
+ * Need to be called with the consumer data lock held or from a context
+ * ensuring no concurrent access to data (e.g: cleanup).
+ */
+static void wait_consumer(struct consumer_data *consumer_data)
+{
+ pid_t ret;
+ int status;
+
+ if (consumer_data->pid <= 0) {
+ return;
+ }
+
+ DBG("Waiting for complete teardown of consumerd (PID: %d)",
+ consumer_data->pid);
+ ret = waitpid(consumer_data->pid, &status, 0);
+ if (ret == -1) {
+ PERROR("consumerd waitpid pid: %d", consumer_data->pid)
+ }
+ if (!WIFEXITED(status)) {
+ ERR("consumerd termination with error: %d",
+ WEXITSTATUS(ret));
+ }
+ consumer_data->pid = 0;
+}
+
/*
* Cleanup the session daemon's data structures.
*/
}
}
+ wait_consumer(&kconsumer_data);
+ wait_consumer(&ustconsumer64_data);
+ wait_consumer(&ustconsumer32_data);
+
DBG("Cleaning up all agent apps");
agent_app_ht_clean();
free(kmod_probes_list);
free(kmod_extra_probes_list);
+ run_as_destroy_worker();
+
/* <fun> */
DBG("%c[%d;%dm*** assert failed :-) *** ==> %c[%dm%c[%d;%dm"
"Matthew, BEET driven development works!%c[%dm",
}
/* Check for data on kernel pipe */
- if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) {
- (void) lttng_read(kernel_poll_pipe[0],
- &tmp, 1);
- /*
- * Ret value is useless here, if this pipe gets any actions an
- * update is required anyway.
- */
- update_poll_flag = 1;
- continue;
- } else {
- /*
- * New CPU detected by the kernel. Adding kernel stream to
- * kernel session and updating the kernel consumer
- */
- if (revents & LPOLLIN) {
+ if (revents & LPOLLIN) {
+ if (pollfd == kernel_poll_pipe[0]) {
+ (void) lttng_read(kernel_poll_pipe[0],
+ &tmp, 1);
+ /*
+ * Ret value is useless here, if this pipe gets any actions an
+ * update is required anyway.
+ */
+ update_poll_flag = 1;
+ continue;
+ } else {
+ /*
+ * New CPU detected by the kernel. Adding kernel stream to
+ * kernel session and updating the kernel consumer
+ */
ret = update_kernel_stream(&kconsumer_data, pollfd);
if (ret < 0) {
continue;
}
break;
- /*
- * TODO: We might want to handle the LPOLLERR | LPOLLHUP
- * and unregister kernel stream at this point.
- */
}
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ update_poll_flag = 1;
+ continue;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
DBG("[thread] Manage consumer started");
+ rcu_register_thread();
+ rcu_thread_online();
+
health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
health_code_update();
/* Event on the registration socket */
if (pollfd == consumer_data->err_sock) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ if (revents & LPOLLIN) {
+ continue;
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ERR("consumer err socket poll error");
goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
if (pollfd == sock) {
/* Event on the consumerd socket */
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+ && !(revents & LPOLLIN)) {
ERR("consumer err socket second poll error");
goto error;
}
goto exit;
} else if (pollfd == consumer_data->metadata_fd) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+ && !(revents & LPOLLIN)) {
+ ERR("consumer err metadata socket second poll error");
+ goto error;
+ }
/* UST metadata requests */
ret = ust_consumer_metadata_request(
&consumer_data->metadata_sock);
unlink(consumer_data->err_unix_sock_path);
unlink(consumer_data->cmd_unix_sock_path);
- consumer_data->pid = 0;
pthread_mutex_unlock(&consumer_data->lock);
/* Cleanup metadata socket mutex. */
health_unregister(health_sessiond);
DBG("consumer thread cleanup completed");
+ rcu_thread_offline();
+ rcu_unregister_thread();
+
return NULL;
}
/* Inspect the apps cmd pipe */
if (pollfd == apps_cmd_pipe[0]) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("Apps command pipe error");
- goto error;
- } else if (revents & LPOLLIN) {
+ if (revents & LPOLLIN) {
int sock;
/* Empty pipe */
health_code_update();
/*
- * We only monitor the error events of the socket. This
- * thread does not handle any incoming data from UST
- * (POLLIN).
+ * Since this is a command socket (write then read),
+ * we only monitor the error events of the socket.
*/
ret = lttng_poll_add(&events, sock,
LPOLLERR | LPOLLHUP | LPOLLRDHUP);
}
DBG("Apps with sock %d added to poll set", sock);
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Apps command pipe error");
+ goto error;
+ } else {
+ ERR("Unknown poll events %u for sock %d", revents, pollfd);
+ goto error;
}
} else {
/*
/* Socket closed on remote end. */
ust_app_unregister(pollfd);
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
ust_app_destroy(wait_node->app);
free(wait_node);
break;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
* Don't care about return value. Let the manage apps threads
* handle app unregistration upon socket close.
*/
- (void) ust_app_register_done(app->sock);
+ (void) ust_app_register_done(app);
/*
* Even if the application socket has been closed, send the app
free(wait_node);
}
+ /* Empty command queue. */
+ for (;;) {
+ /* Dequeue command for registration */
+ node = cds_wfcq_dequeue_blocking(&ust_cmd_queue.head, &ust_cmd_queue.tail);
+ if (node == NULL) {
+ break;
+ }
+ ust_cmd = caa_container_of(node, struct ust_command, node);
+ ret = close(ust_cmd->sock);
+ if (ret < 0) {
+ PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
+ }
+ lttng_fd_put(LTTNG_FD_APPS, 1);
+ free(ust_cmd);
+ }
+
error_testpoint:
DBG("Dispatch thread dying");
if (err) {
/* Event on the registration socket */
if (pollfd == apps_sock) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("Register apps socket poll error");
- goto error;
- } else if (revents & LPOLLIN) {
+ if (revents & LPOLLIN) {
sock = lttcomm_accept_unix_sock(apps_sock);
if (sock < 0) {
goto error;
* lttcomm_setsockopt_snd_timeout expect msec as
* parameter.
*/
- (void) lttcomm_setsockopt_rcv_timeout(sock,
- app_socket_timeout * 1000);
- (void) lttcomm_setsockopt_snd_timeout(sock,
- app_socket_timeout * 1000);
+ if (app_socket_timeout >= 0) {
+ (void) lttcomm_setsockopt_rcv_timeout(sock,
+ app_socket_timeout * 1000);
+ (void) lttcomm_setsockopt_snd_timeout(sock,
+ app_socket_timeout * 1000);
+ }
/*
* Set the CLOEXEC flag. Return code is useless because
* barrier with the exchange in cds_wfcq_enqueue.
*/
futex_nto1_wake(&ust_cmd_queue.futex);
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Register apps socket poll error");
+ goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
* domain.
*/
if (session->kernel_session->consumer) {
- consumer_destroy_output(session->kernel_session->consumer);
+ consumer_output_put(session->kernel_session->consumer);
}
session->kernel_session->consumer =
consumer_copy_output(session->consumer);
case LTTNG_DOMAIN_UST:
DBG3("Copying tracing session consumer output in UST session");
if (session->ust_session->consumer) {
- consumer_destroy_output(session->ust_session->consumer);
+ consumer_output_put(session->ust_session->consumer);
}
session->ust_session->consumer =
consumer_copy_output(session->consumer);
session->kernel_session->consumer->dst.trace_path,
S_IRWXU | S_IRWXG, session->uid, session->gid);
if (ret < 0) {
- if (ret != -EEXIST) {
+ if (errno != EEXIST) {
ERR("Trace directory creation error");
goto error;
}
DBG("Processing client command %d", cmd_ctx->lsm->cmd_type);
+ assert(!rcu_read_ongoing());
+
*sock_error = 0;
switch (cmd_ctx->lsm->cmd_type) {
case LTTNG_LIST_EVENTS:
case LTTNG_LIST_SYSCALLS:
case LTTNG_LIST_TRACKER_PIDS:
+ case LTTNG_DATA_PENDING:
break;
default:
/* Setup lttng message with no payload */
if (cmd_ctx->lsm->cmd_type == LTTNG_START_TRACE ||
cmd_ctx->lsm->cmd_type == LTTNG_STOP_TRACE) {
switch (cmd_ctx->lsm->domain.type) {
+ case LTTNG_DOMAIN_NONE:
+ break;
case LTTNG_DOMAIN_JUL:
case LTTNG_DOMAIN_LOG4J:
case LTTNG_DOMAIN_PYTHON:
goto error;
}
break;
+ default:
+ ret = LTTNG_ERR_UNKNOWN_DOMAIN;
+ goto error;
}
}
session_unlock_list();
}
init_setup_error:
+ assert(!rcu_read_ongoing());
return ret;
}
/* Event on the registration socket */
if (pollfd == sock) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ if (revents & LPOLLIN) {
+ continue;
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ERR("Health socket poll error");
goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
}
lttng_poll_clean(&events);
-
+ stop_threads();
rcu_unregister_thread();
return NULL;
}
/* Event on the registration socket */
if (pollfd == client_sock) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ if (revents & LPOLLIN) {
+ continue;
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ERR("Client socket poll error");
goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
{
int ret = 0;
- if (arg && arg[0] == '\0') {
- /*
- * This only happens if the value is read from daemon config
- * file. This means the option requires an argument and the
- * configuration file contains a line such as:
- * my_option =
- */
- ret = -EINVAL;
- goto end;
- }
-
if (string_match(optname, "client-sock") || opt == 'c') {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"-c, --client-sock");
snprintf(client_unix_sock_path, PATH_MAX, "%s", arg);
}
} else if (string_match(optname, "apps-sock") || opt == 'a') {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"-a, --apps-sock");
} else if (string_match(optname, "background") || opt == 'b') {
opt_background = 1;
} else if (string_match(optname, "group") || opt == 'g') {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"-g, --group");
} else if (string_match(optname, "sig-parent") || opt == 'S') {
opt_sig_parent = 1;
} else if (string_match(optname, "kconsumerd-err-sock")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--kconsumerd-err-sock");
snprintf(kconsumer_data.err_unix_sock_path, PATH_MAX, "%s", arg);
}
} else if (string_match(optname, "kconsumerd-cmd-sock")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--kconsumerd-cmd-sock");
snprintf(kconsumer_data.cmd_unix_sock_path, PATH_MAX, "%s", arg);
}
} else if (string_match(optname, "ustconsumerd64-err-sock")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--ustconsumerd64-err-sock");
snprintf(ustconsumer64_data.err_unix_sock_path, PATH_MAX, "%s", arg);
}
} else if (string_match(optname, "ustconsumerd64-cmd-sock")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--ustconsumerd64-cmd-sock");
snprintf(ustconsumer64_data.cmd_unix_sock_path, PATH_MAX, "%s", arg);
}
} else if (string_match(optname, "ustconsumerd32-err-sock")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--ustconsumerd32-err-sock");
snprintf(ustconsumer32_data.err_unix_sock_path, PATH_MAX, "%s", arg);
}
} else if (string_match(optname, "ustconsumerd32-cmd-sock")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--ustconsumerd32-cmd-sock");
opt_verbose_consumer += 1;
}
} else if (string_match(optname, "consumerd32-path")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--consumerd32-path");
consumerd32_bin_override = 1;
}
} else if (string_match(optname, "consumerd32-libdir")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--consumerd32-libdir");
consumerd32_libdir_override = 1;
}
} else if (string_match(optname, "consumerd64-path")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--consumerd64-path");
consumerd64_bin_override = 1;
}
} else if (string_match(optname, "consumerd64-libdir")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--consumerd64-libdir");
consumerd64_libdir_override = 1;
}
} else if (string_match(optname, "pidfile") || opt == 'p') {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"-p, --pidfile");
}
}
} else if (string_match(optname, "agent-tcp-port")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--agent-tcp-port");
DBG3("Agent TCP port set to non default: %u", agent_tcp_port);
}
} else if (string_match(optname, "load") || opt == 'l') {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"-l, --load");
}
}
} else if (string_match(optname, "kmod-probes")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--kmod-probes");
}
}
} else if (string_match(optname, "extra-kmod-probes")) {
+ if (!arg || *arg == '\0') {
+ ret = -EINVAL;
+ goto end;
+ }
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"--extra-kmod-probes");
static void sighandler(int sig)
{
switch (sig) {
- case SIGPIPE:
- DBG("SIGPIPE caught");
- return;
case SIGINT:
DBG("SIGINT caught");
stop_threads();
return ret;
}
- sa.sa_handler = sighandler;
sa.sa_mask = sigset;
sa.sa_flags = 0;
+
+ sa.sa_handler = sighandler;
if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
PERROR("sigaction");
return ret;
return ret;
}
- if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
+ if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) {
PERROR("sigaction");
return ret;
}
- if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) {
+ sa.sa_handler = SIG_IGN;
+ if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
PERROR("sigaction");
return ret;
}
assert(rundir);
if (opt_pidfile) {
- strncpy(pidfile_path, opt_pidfile, sizeof(pidfile_path));
+ if (lttng_strncpy(pidfile_path, opt_pidfile, sizeof(pidfile_path))) {
+ ret = -1;
+ goto error;
+ }
} else {
/* Build pidfile path from rundir and opt_pidfile. */
ret = snprintf(pidfile_path, sizeof(pidfile_path), "%s/"
}
}
+ if (run_as_create_worker(argv[0]) < 0) {
+ goto exit_create_run_as_worker_cleanup;
+ }
+
/*
* Starting from here, we can create threads. This needs to be after
* lttng_daemonize due to RCU.
}
exit_reg_apps:
+ /*
+ * Join dispatch thread after joining reg_apps_thread to ensure
+ * we don't leak applications in the queue.
+ */
ret = pthread_join(dispatch_thread, &status);
if (ret) {
errno = ret;
rcu_thread_offline();
rcu_unregister_thread();
+ /*
+ * Ensure all prior call_rcu are done. call_rcu callbacks may push
+ * hash tables to the ht_cleanup thread. Therefore, we ensure that
+ * the queue is empty before shutting down the clean-up thread.
+ */
+ rcu_barrier();
+
ret = notify_thread_pipe(ht_cleanup_quit_pipe[1]);
if (ret < 0) {
ERR("write error on ht_cleanup quit pipe");
health_app_destroy(health_sessiond);
exit_health_sessiond_cleanup:
+exit_create_run_as_worker_cleanup:
exit_options:
sessiond_cleanup_options();
exit_set_signal_handler:
+
if (!retval) {
exit(EXIT_SUCCESS);
} else {