X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=ed78b5622911cfc75517814f32a17de7a1738c4d;hb=76f3e2db72941aa4d234ac4aef087222d30b77b7;hp=007c722921700e351cfedd6441fc31a16a9683b3;hpb=12e2b88170b3bf7a55beb692c717470752ad51eb;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 007c72292..ed78b5622 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -236,6 +236,9 @@ static int app_socket_timeout; /* Set in main() with the current page size. */ long page_size; +/* If set to nonzero, spawn the consumer with valgrind. */ +static int consumer_debug_valgrind; + static void setup_consumerd_path(void) { @@ -646,7 +649,8 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) pthread_mutex_lock(socket->lock); ret = kernel_consumer_send_channel_stream(socket, - channel, ksess); + channel, ksess, + session->output_traces ? 1 : 0); pthread_mutex_unlock(socket->lock); if (ret < 0) { rcu_read_unlock(); @@ -1334,6 +1338,91 @@ error: return ret; } +/* + * Sanitize the wait queue of the dispatch registration thread meaning removing + * invalid nodes from it. This is to avoid memory leaks for the case the UST + * notify socket is never received. + */ +static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) +{ + int ret, nb_fd = 0, i; + unsigned int fd_added = 0; + struct lttng_poll_event events; + struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; + + assert(wait_queue); + + lttng_poll_init(&events); + + /* Just skip everything for an empty queue. */ + if (!wait_queue->count) { + goto end; + } + + ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC); + if (ret < 0) { + goto error_create; + } + + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue->head, head) { + assert(wait_node->app); + ret = lttng_poll_add(&events, wait_node->app->sock, + LPOLLHUP | LPOLLERR); + if (ret < 0) { + goto error; + } + + fd_added = 1; + } + + if (!fd_added) { + goto end; + } + + /* + * Poll but don't block so we can quickly identify the faulty events and + * clean them afterwards from the wait queue. + */ + ret = lttng_poll_wait(&events, 0); + if (ret < 0) { + goto error; + } + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Get faulty FD. */ + uint32_t revents = LTTNG_POLL_GETEV(&events, i); + int pollfd = LTTNG_POLL_GETFD(&events, i); + + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue->head, head) { + if (pollfd == wait_node->app->sock && + (revents & (LPOLLHUP | LPOLLERR))) { + cds_list_del(&wait_node->head); + wait_queue->count--; + ust_app_destroy(wait_node->app); + free(wait_node); + break; + } + } + } + + if (nb_fd > 0) { + DBG("Wait queue sanitized, %d node were cleaned up", nb_fd); + } + +end: + lttng_poll_clean(&events); + return; + +error: + lttng_poll_clean(&events); +error_create: + ERR("Unable to sanitize wait queue"); + return; +} + /* * Dispatch request from the registration threads to the application * communication thread. @@ -1343,16 +1432,16 @@ static void *thread_dispatch_ust_registration(void *data) int ret, err = -1; struct cds_wfq_node *node; struct ust_command *ust_cmd = NULL; - struct { - struct ust_app *app; - struct cds_list_head head; - } *wait_node = NULL, *tmp_wait_node; + struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; + struct ust_reg_wait_queue wait_queue = { + .count = 0, + }; health_register(HEALTH_TYPE_APP_REG_DISPATCH); health_code_update(); - CDS_LIST_HEAD(wait_queue); + CDS_INIT_LIST_HEAD(&wait_queue.head); DBG("[thread] Dispatch UST command started"); @@ -1366,6 +1455,13 @@ static void *thread_dispatch_ust_registration(void *data) struct ust_app *app = NULL; ust_cmd = NULL; + /* + * Make sure we don't have node(s) that have hung up before receiving + * the notify socket. This is to clean the list in order to avoid + * memory leaks from notify socket that are never seen. + */ + sanitize_wait_queue(&wait_queue); + health_code_update(); /* Dequeue command for registration */ node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue); @@ -1415,7 +1511,8 @@ static void *thread_dispatch_ust_registration(void *data) * Add application to the wait queue so we can set the notify * socket before putting this object in the global ht. */ - cds_list_add(&wait_node->head, &wait_queue); + cds_list_add(&wait_node->head, &wait_queue.head); + wait_queue.count++; free(ust_cmd); /* @@ -1430,11 +1527,12 @@ static void *thread_dispatch_ust_registration(void *data) * notify socket if found. */ cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue, head) { + &wait_queue.head, head) { health_code_update(); if (wait_node->app->pid == ust_cmd->reg_msg.pid) { wait_node->app->notify_sock = ust_cmd->sock; cds_list_del(&wait_node->head); + wait_queue.count--; app = wait_node->app; free(wait_node); DBG3("UST app notify socket %d is set", ust_cmd->sock); @@ -1529,8 +1627,9 @@ static void *thread_dispatch_ust_registration(void *data) error: /* Clean up wait queue. */ cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue, head) { + &wait_queue.head, head) { cds_list_del(&wait_node->head); + wait_queue.count--; free(wait_node); } @@ -1936,11 +2035,21 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) break; } DBG("Using kernel consumer at: %s", consumer_to_use); - execl(consumer_to_use, - "lttng-consumerd", verbosity, "-k", - "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path, - "--consumerd-err-sock", consumer_data->err_unix_sock_path, - NULL); + if (consumer_debug_valgrind) { + execl("/usr/bin/valgrind", + "valgrind", "--leak-check=full", "--show-reachable=yes", + "--tool=memcheck", "--track-fds=yes", + "--log-file=/tmp/valgrind.kconsumer.log", + consumer_to_use, verbosity, "-k", + "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path, + "--consumerd-err-sock", consumer_data->err_unix_sock_path, + NULL); + } else { + execl(consumer_to_use, "lttng-consumerd", verbosity, "-k", + "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path, + "--consumerd-err-sock", consumer_data->err_unix_sock_path, + NULL); + } break; case LTTNG_CONSUMER64_UST: { @@ -1975,10 +2084,21 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) } } DBG("Using 64-bit UST consumer at: %s", consumerd64_bin); - ret = execl(consumerd64_bin, "lttng-consumerd", verbosity, "-u", - "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path, - "--consumerd-err-sock", consumer_data->err_unix_sock_path, - NULL); + if (consumer_debug_valgrind) { + ret = execl("/usr/bin/valgrind", + "valgrind", "--leak-check=full", "--show-reachable=yes", + "--tool=memcheck", "--track-fds=yes", + "--log-file=/tmp/valgrind.ust64consumer.log", + consumerd64_bin, verbosity, "-u", + "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path, + "--consumerd-err-sock", consumer_data->err_unix_sock_path, + NULL); + } else { + ret = execl(consumerd64_bin, "lttng-consumerd", verbosity, "-u", + "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path, + "--consumerd-err-sock", consumer_data->err_unix_sock_path, + NULL); + } if (consumerd64_libdir[0] != '\0') { free(tmpnew); } @@ -2020,10 +2140,21 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) } } DBG("Using 32-bit UST consumer at: %s", consumerd32_bin); - ret = execl(consumerd32_bin, "lttng-consumerd", verbosity, "-u", - "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path, - "--consumerd-err-sock", consumer_data->err_unix_sock_path, - NULL); + if (consumer_debug_valgrind) { + ret = execl("/usr/bin/valgrind", + "valgrind", "--leak-check=full", "--show-reachable=yes", + "--tool=memcheck", "--track-fds=yes", + "--log-file=/tmp/valgrind.ust32consumer.log", + consumerd32_bin, verbosity, "-u", + "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path, + "--consumerd-err-sock", consumer_data->err_unix_sock_path, + NULL); + } else { + ret = execl(consumerd32_bin, "lttng-consumerd", verbosity, "-u", + "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path, + "--consumerd-err-sock", consumer_data->err_unix_sock_path, + NULL); + } if (consumerd32_libdir[0] != '\0') { free(tmpnew); } @@ -2282,6 +2413,7 @@ static int create_ust_session(struct ltt_session *session, lus->uid = session->uid; lus->gid = session->gid; + lus->output_traces = session->output_traces; session->ust_session = lus; /* Copy session output to the newly created UST session */ @@ -2338,6 +2470,7 @@ static int create_kernel_session(struct ltt_session *session) session->kernel_session->uid = session->uid; session->kernel_session->gid = session->gid; + session->kernel_session->output_traces = session->output_traces; return LTTNG_OK; @@ -2399,6 +2532,10 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, case LTTNG_START_TRACE: case LTTNG_STOP_TRACE: case LTTNG_DATA_PENDING: + case LTTNG_SNAPSHOT_ADD_OUTPUT: + case LTTNG_SNAPSHOT_DEL_OUTPUT: + case LTTNG_SNAPSHOT_LIST_OUTPUT: + case LTTNG_SNAPSHOT_RECORD: need_domain = 0; break; default: @@ -2467,12 +2604,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, session_lock_list(); cmd_ctx->session = session_find_by_name(cmd_ctx->lsm->session.name); if (cmd_ctx->session == NULL) { - if (cmd_ctx->lsm->session.name != NULL) { - ret = LTTNG_ERR_SESS_NOT_FOUND; - } else { - /* If no session name specified */ - ret = LTTNG_ERR_SELECT_SESS; - } + ret = LTTNG_ERR_SESS_NOT_FOUND; goto error; } else { /* Acquire lock for the session */ @@ -3086,6 +3218,67 @@ skip_domain: ret = cmd_data_pending(cmd_ctx->session); break; } + case LTTNG_SNAPSHOT_ADD_OUTPUT: + { + struct lttcomm_lttng_output_id reply; + + ret = cmd_snapshot_add_output(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_output.output, &reply.id); + if (ret != LTTNG_OK) { + goto error; + } + + ret = setup_lttng_msg(cmd_ctx, sizeof(reply)); + if (ret < 0) { + goto setup_error; + } + + /* Copy output list into message payload */ + memcpy(cmd_ctx->llm->payload, &reply, sizeof(reply)); + ret = LTTNG_OK; + break; + } + case LTTNG_SNAPSHOT_DEL_OUTPUT: + { + ret = cmd_snapshot_del_output(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_output.output); + break; + } + case LTTNG_SNAPSHOT_LIST_OUTPUT: + { + ssize_t nb_output; + struct lttng_snapshot_output *outputs = NULL; + + nb_output = cmd_snapshot_list_outputs(cmd_ctx->session, &outputs); + if (nb_output < 0) { + ret = -nb_output; + goto error; + } + + ret = setup_lttng_msg(cmd_ctx, + nb_output * sizeof(struct lttng_snapshot_output)); + if (ret < 0) { + free(outputs); + goto setup_error; + } + + if (outputs) { + /* Copy output list into message payload */ + memcpy(cmd_ctx->llm->payload, outputs, + nb_output * sizeof(struct lttng_snapshot_output)); + free(outputs); + } + + ret = LTTNG_OK; + break; + } + case LTTNG_SNAPSHOT_RECORD: + { + ret = cmd_snapshot_record(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_record.output, + cmd_ctx->lsm->u.snapshot_record.wait); + break; + } default: ret = LTTNG_ERR_UND; break; @@ -4117,6 +4310,13 @@ int main(int argc, char **argv) } } + /* Valgrind env. variable setup. */ + if (getenv(DEFAULT_CONSUMER_DEBUG_VALGRIND_ENV)) { + consumer_debug_valgrind = 1; + /* Valgrind does not support clone(). */ + setenv("LTTNG_DEBUG_NOCLONE", "1", 1); + } + /* Create thread quit pipe */ if ((ret = init_thread_quit_pipe()) < 0) { goto error; @@ -4166,7 +4366,7 @@ int main(int argc, char **argv) DBG2("Kernel consumer cmd path: %s", kconsumer_data.cmd_unix_sock_path); } else { - home_path = get_home_dir(); + home_path = utils_get_home_dir(); if (home_path == NULL) { /* TODO: Add --socket PATH option */ ERR("Can't get HOME directory for sockets creation.");