X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=b8791553670fad96ca8c720a10c0feb1b37f6b63;hb=584fc280d529740882a24c87a431e547b5acf8e0;hp=48868820e74df4846d800638ce59695319867ad0;hpb=65da3cc731c72486d53d58c9b080f235121177d9;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 48868820e..b87915536 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -629,6 +629,9 @@ static void cleanup(void) } } + DBG("Cleaning up all agent apps"); + agent_app_ht_clean(); + DBG("Closing all UST sockets"); ust_app_clean_list(); buffer_reg_destroy_registries(); @@ -1048,31 +1051,33 @@ static void *thread_manage_kernel(void *data) } /* Check for data on kernel pipe */ - if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) { - (void) lttng_read(kernel_poll_pipe[0], - &tmp, 1); - /* - * Ret value is useless here, if this pipe gets any actions an - * update is required anyway. - */ - update_poll_flag = 1; - continue; - } else { - /* - * New CPU detected by the kernel. Adding kernel stream to - * kernel session and updating the kernel consumer - */ - if (revents & LPOLLIN) { + if (revents & LPOLLIN) { + if (pollfd == kernel_poll_pipe[0]) { + (void) lttng_read(kernel_poll_pipe[0], + &tmp, 1); + /* + * Ret value is useless here, if this pipe gets any actions an + * update is required anyway. + */ + update_poll_flag = 1; + continue; + } else { + /* + * New CPU detected by the kernel. Adding kernel stream to + * kernel session and updating the kernel consumer + */ ret = update_kernel_stream(&kconsumer_data, pollfd); if (ret < 0) { continue; } break; - /* - * TODO: We might want to handle the LPOLLERR | LPOLLHUP - * and unregister kernel stream at this point. - */ } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + update_poll_flag = 1; + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1131,6 +1136,9 @@ static void *thread_manage_consumer(void *data) DBG("[thread] Manage consumer started"); + rcu_register_thread(); + rcu_thread_online(); + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER); health_code_update(); @@ -1199,9 +1207,14 @@ restart: /* Event on the registration socket */ if (pollfd == consumer_data->err_sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("consumer err socket poll error"); goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1331,7 +1344,8 @@ restart_poll: if (pollfd == sock) { /* Event on the consumerd socket */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) + && !(revents & LPOLLIN)) { ERR("consumer err socket second poll error"); goto error; } @@ -1349,6 +1363,11 @@ restart_poll: goto exit; } else if (pollfd == consumer_data->metadata_fd) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) + && !(revents & LPOLLIN)) { + ERR("consumer err metadata socket second poll error"); + goto error; + } /* UST metadata requests */ ret = ust_consumer_metadata_request( &consumer_data->metadata_sock); @@ -1429,6 +1448,9 @@ error_poll: health_unregister(health_sessiond); DBG("consumer thread cleanup completed"); + rcu_thread_offline(); + rcu_unregister_thread(); + return NULL; } @@ -1514,10 +1536,7 @@ static void *thread_manage_apps(void *data) /* Inspect the apps cmd pipe */ if (pollfd == apps_cmd_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Apps command pipe error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { int sock; /* Empty pipe */ @@ -1530,9 +1549,8 @@ static void *thread_manage_apps(void *data) health_code_update(); /* - * We only monitor the error events of the socket. This - * thread does not handle any incoming data from UST - * (POLLIN). + * Since this is a command socket (write then read), + * we only monitor the error events of the socket. */ ret = lttng_poll_add(&events, sock, LPOLLERR | LPOLLHUP | LPOLLRDHUP); @@ -1541,6 +1559,12 @@ static void *thread_manage_apps(void *data) } DBG("Apps with sock %d added to poll set", sock); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Apps command pipe error"); + goto error; + } else { + ERR("Unknown poll events %u for sock %d", revents, pollfd); + goto error; } } else { /* @@ -1556,6 +1580,9 @@ static void *thread_manage_apps(void *data) /* Socket closed on remote end. */ ust_app_unregister(pollfd); + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } @@ -1697,6 +1724,9 @@ static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) ust_app_destroy(wait_node->app); free(wait_node); break; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1898,7 +1928,7 @@ static void *thread_dispatch_ust_registration(void *data) * Don't care about return value. Let the manage apps threads * handle app unregistration upon socket close. */ - (void) ust_app_register_done(app->sock); + (void) ust_app_register_done(app); /* * Even if the application socket has been closed, send the app @@ -1940,6 +1970,22 @@ error: free(wait_node); } + /* Empty command queue. */ + for (;;) { + /* Dequeue command for registration */ + node = cds_wfcq_dequeue_blocking(&ust_cmd_queue.head, &ust_cmd_queue.tail); + if (node == NULL) { + break; + } + ust_cmd = caa_container_of(node, struct ust_command, node); + ret = close(ust_cmd->sock); + if (ret < 0) { + PERROR("close ust sock exit dispatch %d", ust_cmd->sock); + } + lttng_fd_put(LTTNG_FD_APPS, 1); + free(ust_cmd); + } + error_testpoint: DBG("Dispatch thread dying"); if (err) { @@ -2041,10 +2087,7 @@ static void *thread_registration_apps(void *data) /* Event on the registration socket */ if (pollfd == apps_sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Register apps socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { sock = lttcomm_accept_unix_sock(apps_sock); if (sock < 0) { goto error; @@ -2072,6 +2115,10 @@ static void *thread_registration_apps(void *data) ust_cmd = zmalloc(sizeof(struct ust_command)); if (ust_cmd == NULL) { PERROR("ust command zmalloc"); + ret = close(sock); + if (ret) { + PERROR("close"); + } goto error; } @@ -2127,6 +2174,12 @@ static void *thread_registration_apps(void *data) * barrier with the exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&ust_cmd_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Register apps socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -2627,7 +2680,7 @@ static int copy_session_consumer(int domain, struct ltt_session *session) * domain. */ if (session->kernel_session->consumer) { - consumer_destroy_output(session->kernel_session->consumer); + consumer_output_put(session->kernel_session->consumer); } session->kernel_session->consumer = consumer_copy_output(session->consumer); @@ -2640,7 +2693,7 @@ static int copy_session_consumer(int domain, struct ltt_session *session) case LTTNG_DOMAIN_UST: DBG3("Copying tracing session consumer output in UST session"); if (session->ust_session->consumer) { - consumer_destroy_output(session->ust_session->consumer); + consumer_output_put(session->ust_session->consumer); } session->ust_session->consumer = consumer_copy_output(session->consumer); @@ -2750,7 +2803,7 @@ static int create_kernel_session(struct ltt_session *session) session->kernel_session->consumer->dst.trace_path, S_IRWXU | S_IRWXG, session->uid, session->gid); if (ret < 0) { - if (ret != -EEXIST) { + if (errno != EEXIST) { ERR("Trace directory creation error"); goto error; } @@ -3165,8 +3218,34 @@ skip_domain: } case LTTNG_DISABLE_EVENT: { + + /* + * FIXME: handle filter; for now we just receive the filter's + * bytecode along with the filter expression which are sent by + * liblttng-ctl and discard them. + * + * This fixes an issue where the client may block while sending + * the filter payload and encounter an error because the session + * daemon closes the socket without ever handling this data. + */ + size_t count = cmd_ctx->lsm->u.disable.expression_len + + cmd_ctx->lsm->u.disable.bytecode_len; + + if (count) { + char data[LTTNG_FILTER_MAX_LEN]; + + DBG("Discarding disable event command payload of size %zu", count); + while (count) { + ret = lttcomm_recv_unix_sock(sock, data, + count > sizeof(data) ? sizeof(data) : count); + if (ret < 0) { + goto error; + } + + count -= (size_t) ret; + } + } /* FIXME: passing packed structure to non-packed pointer */ - /* TODO: handle filter */ ret = cmd_disable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, cmd_ctx->lsm->u.disable.channel_name, &cmd_ctx->lsm->u.disable.event); @@ -3903,9 +3982,14 @@ restart: /* Event on the registration socket */ if (pollfd == sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Health socket poll error"); goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -4080,9 +4164,14 @@ static void *thread_manage_clients(void *data) /* Event on the registration socket */ if (pollfd == client_sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Client socket poll error"); goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -5077,6 +5166,9 @@ int main(int argc, char **argv) void *status; const char *home_path, *env_app_timeout; + /* Initialize agent apps ht global variable */ + agent_apps_ht_by_sock = NULL; + init_kernel_workarounds(); rcu_register_thread(); @@ -5268,20 +5360,24 @@ int main(int argc, char **argv) goto error; } + /* After this point, we can safely call cleanup() with "goto exit" */ + /* * Init UST app hash table. Alloc hash table before this point since * cleanup() can get called after that point. */ ust_app_ht_alloc(); - /* Initialize agent domain subsystem. */ - if ((ret = agent_setup()) < 0) { - /* ENOMEM at this point. */ - goto error; + /* + * Initialize agent app hash table. We allocate the hash table here + * since cleanup() can get called after this point. + */ + if (agent_app_ht_alloc()) { + ERR("Failed to allocate Agent app hash table"); + ret = -1; + goto exit; } - /* After this point, we can safely call cleanup() with "goto exit" */ - /* * These actions must be executed as root. We do that *after* setting up * the sockets path because we MUST make the check for another daemon using @@ -5579,6 +5675,7 @@ exit: cleanup(); rcu_thread_offline(); rcu_unregister_thread(); + rcu_barrier(); if (!ret) { exit(EXIT_SUCCESS); }