X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fnotification-thread-events.c;h=76a7363bf8d03bb907f420abdbd377deff7f8563;hb=428b440c632be8aa4938ae7fcbd5831d492a1d77;hp=c7037a482bfcdeccf70e0159089467196b690a79;hpb=e1e94513ca41033ebfd94f23d1656b8ab6576e88;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c index c7037a482..76a7363bf 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.c +++ b/src/bin/lttng-sessiond/notification-thread-events.c @@ -48,8 +48,8 @@ #include "lttng-sessiond.h" #include "kernel.h" -#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP) -#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT) +#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP) +#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT) /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */ #define MAX_CAPTURE_SIZE (PIPE_BUF) @@ -2039,7 +2039,7 @@ int handle_notification_thread_command_add_tracer_event_source( lttng_domain_type_str(domain_type)); /* Adding the read side pipe to the event poll. */ - ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLPRI | LPOLLIN | LPOLLERR); if (ret < 0) { ERR("Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'", tracer_event_source_fd, @@ -2110,95 +2110,121 @@ end: } static -int handle_notification_thread_command_remove_tracer_event_source( - struct notification_thread_state *state, - int tracer_event_source_fd, - enum lttng_error_code *_cmd_result) +struct notification_event_tracer_event_source_element * +find_tracer_event_source_element(struct notification_thread_state *state, + int tracer_event_source_fd) { - int ret = 0; - bool found = false; - enum lttng_error_code cmd_result = LTTNG_OK; - struct notification_event_tracer_event_source_element *source_element = NULL, *tmp; + struct notification_event_tracer_event_source_element *source_element; - cds_list_for_each_entry_safe(source_element, tmp, + cds_list_for_each_entry(source_element, &state->tracer_event_sources_list, node) { - if (source_element->fd != tracer_event_source_fd) { - continue; + if (source_element->fd == tracer_event_source_fd) { + goto end; } - - DBG("Removed tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'", - tracer_event_source_fd, - lttng_domain_type_str(source_element->domain)); - cds_list_del(&source_element->node); - found = true; - break; } - if (!found) { - /* - * This is temporarily allowed since the poll activity set is - * not properly cleaned-up for the moment. This is adressed in - * an upcoming fix. - */ - source_element = NULL; - goto end; - } + source_element = NULL; +end: + return source_element; +} - if (!source_element->is_fd_in_poll_set) { - /* Skip the poll set removal. */ - goto end; - } +static +int remove_tracer_event_source_from_pollset( + struct notification_thread_state *state, + struct notification_event_tracer_event_source_element *source_element) +{ + int ret = 0; + + assert(source_element->is_fd_in_poll_set); DBG3("Removing tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'", - tracer_event_source_fd, + source_element->fd, lttng_domain_type_str(source_element->domain)); /* Removing the fd from the event poll set. */ - ret = lttng_poll_del(&state->events, tracer_event_source_fd); + ret = lttng_poll_del(&state->events, source_element->fd); if (ret < 0) { ERR("Failed to remove tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'", - tracer_event_source_fd, + source_element->fd, lttng_domain_type_str(source_element->domain)); - cmd_result = LTTNG_ERR_FATAL; + ret = -1; goto end; } source_element->is_fd_in_poll_set = false; - ret = drain_event_notifier_notification_pipe(state, tracer_event_source_fd, + /* + * Force the notification thread to restart the poll() loop to ensure + * that any events from the removed fd are removed. + */ + state->restart_poll = true; + + ret = drain_event_notifier_notification_pipe(state, source_element->fd, source_element->domain); if (ret) { ERR("Error draining event notifier notification: tracer_event_source_fd = %d, domain = %s", - tracer_event_source_fd, + source_element->fd, lttng_domain_type_str(source_element->domain)); - cmd_result = LTTNG_ERR_FATAL; + ret = -1; goto end; } - /* - * The drain_event_notifier_notification_pipe() call might have read - * data from an fd that we received in event in the latest _poll_wait() - * call. Make sure the thread call poll_wait() again to ensure we have - * a clean state. - */ - state->restart_poll = true; - end: - free(source_element); - *_cmd_result = cmd_result; return ret; } -int handle_notification_thread_remove_tracer_event_source_no_result( +int handle_notification_thread_tracer_event_source_died( struct notification_thread_state *state, int tracer_event_source_fd) { - int ret; - enum lttng_error_code cmd_result; + int ret = 0; + struct notification_event_tracer_event_source_element *source_element; + + source_element = find_tracer_event_source_element(state, + tracer_event_source_fd); + + assert(source_element); + + ret = remove_tracer_event_source_from_pollset(state, source_element); + if (ret) { + ERR("Failed to remove dead tracer event source from poll set"); + } + + return ret; +} + +static +int handle_notification_thread_command_remove_tracer_event_source( + struct notification_thread_state *state, + int tracer_event_source_fd, + enum lttng_error_code *_cmd_result) +{ + int ret = 0; + enum lttng_error_code cmd_result = LTTNG_OK; + struct notification_event_tracer_event_source_element *source_element = NULL; + + source_element = find_tracer_event_source_element(state, + tracer_event_source_fd); + + assert(source_element); - ret = handle_notification_thread_command_remove_tracer_event_source( - state, tracer_event_source_fd, &cmd_result); - (void) cmd_result; + /* Remove the tracer source from the list. */ + cds_list_del(&source_element->node); + + if (!source_element->is_fd_in_poll_set) { + /* Skip the poll set removal. */ + goto end; + } + + ret = remove_tracer_event_source_from_pollset(state, source_element); + if (ret) { + ERR("Failed to remove tracer event source from poll set"); + cmd_result = LTTNG_ERR_FATAL; + } + +end: + free(source_element); + *_cmd_result = cmd_result; return ret; } @@ -3083,28 +3109,55 @@ end: return 0; } +static +int pop_cmd_queue(struct notification_thread_handle *handle, + struct notification_thread_command **cmd) +{ + int ret; + uint64_t counter; + + pthread_mutex_lock(&handle->cmd_queue.lock); + ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter)); + if (ret != sizeof(counter)) { + ret = -1; + goto error_unlock; + } + + /* Simulate behaviour of EFD_SEMAPHORE for older kernels. */ + counter -= 1; + if (counter != 0) { + ret = lttng_write(handle->cmd_queue.event_fd, &counter, + sizeof(counter)); + if (ret != sizeof(counter)) { + PERROR("Failed to write back to event_fd for EFD_SEMAPHORE emulation"); + ret = -1; + goto error_unlock; + } + } + + *cmd = cds_list_first_entry(&handle->cmd_queue.list, + struct notification_thread_command, cmd_list_node); + cds_list_del(&((*cmd)->cmd_list_node)); + ret = 0; + +error_unlock: + pthread_mutex_unlock(&handle->cmd_queue.lock); + return ret; +} + /* Returns 0 on success, 1 on exit requested, negative value on error. */ int handle_notification_thread_command( struct notification_thread_handle *handle, struct notification_thread_state *state) { int ret; - uint64_t counter; struct notification_thread_command *cmd; - /* Read the event pipe to put it back into a quiescent state. */ - ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter, - sizeof(counter)); - if (ret != sizeof(counter)) { + ret = pop_cmd_queue(handle, &cmd); + if (ret) { goto error; } - pthread_mutex_lock(&handle->cmd_queue.lock); - cmd = cds_list_first_entry(&handle->cmd_queue.list, - struct notification_thread_command, cmd_list_node); - cds_list_del(&cmd->cmd_list_node); - pthread_mutex_unlock(&handle->cmd_queue.lock); - DBG("Received `%s` command", notification_command_type_str(cmd->type)); switch (cmd->type) { @@ -3339,9 +3392,9 @@ int handle_notification_thread_client_connect( goto error; } + client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN; ret = lttng_poll_add(&state->events, client->socket, - LPOLLIN | LPOLLERR | - LPOLLHUP | LPOLLRDHUP); + client->communication.current_poll_events); if (ret < 0) { ERR("Failed to add notification channel client socket to poll set"); ret = 0; @@ -3473,6 +3526,18 @@ int handle_notification_thread_trigger_unregister_all( return error_occurred ? -1 : 0; } +static +bool client_has_outbound_data_left( + const struct notification_client *client) +{ + const struct lttng_payload_view pv = lttng_payload_view_from_payload( + &client->communication.outbound.payload, 0, -1); + const bool has_data = pv.buffer.size != 0; + const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv); + + return has_data || has_fds; +} + static int client_handle_transmission_status( struct notification_client *client, @@ -3483,24 +3548,51 @@ int client_handle_transmission_status( switch (transmission_status) { case CLIENT_TRANSMISSION_STATUS_COMPLETE: - ret = lttng_poll_mod(&state->events, client->socket, - CLIENT_POLL_MASK_IN); - if (ret) { - goto end; - } - - break; case CLIENT_TRANSMISSION_STATUS_QUEUED: + { + int current_poll_events; + int new_poll_events; /* * We want to be notified whenever there is buffer space - * available to send the rest of the payload. + * available to send the rest of the payload if we are + * waiting to send data to the client. + * + * The state of the outbound queue being sampled here is + * fine since: + * - it is okay to wake-up "for nothing" in case we see + * that data is left, but another thread succeeds in + * flushing it before us when handling the client "out" + * event. We will simply stop monitoring that event the next + * time it wakes us up and we see no data left to be sent, + * - if another thread fails to flush the entire client + * outgoing queue, it will issue a "communication update" + * command and cause the client's (e)poll mask to be + * re-evaluated. + * + * The situation we seek to avoid would be to disable the + * monitoring of "out" client events indefinitely when there is + * data to be sent, which can't happen because of the + * aforementioned "communication update" mechanism. */ - ret = lttng_poll_mod(&state->events, client->socket, - CLIENT_POLL_MASK_IN_OUT); - if (ret) { - goto end; + pthread_mutex_lock(&client->lock); + current_poll_events = client->communication.current_poll_events; + new_poll_events = client_has_outbound_data_left(client) ? + CLIENT_POLL_EVENTS_IN_OUT : + CLIENT_POLL_EVENTS_IN; + client->communication.current_poll_events = new_poll_events; + pthread_mutex_unlock(&client->lock); + + /* Update the monitored event set only if it changed. */ + if (current_poll_events != new_poll_events) { + ret = lttng_poll_mod(&state->events, client->socket, + new_poll_events); + if (ret) { + goto end; + } } + break; + } case CLIENT_TRANSMISSION_STATUS_FAIL: ret = notification_thread_client_disconnect(client, state); if (ret) { @@ -3640,18 +3732,6 @@ error: return CLIENT_TRANSMISSION_STATUS_ERROR; } -static -bool client_has_outbound_data_left( - const struct notification_client *client) -{ - const struct lttng_payload_view pv = lttng_payload_view_from_payload( - &client->communication.outbound.payload, 0, -1); - const bool has_data = pv.buffer.size != 0; - const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv); - - return has_data || has_fds; -} - /* Client lock must _not_ be held by the caller. */ static int client_send_command_reply(struct notification_client *client, @@ -3794,9 +3874,11 @@ int client_handle_message_handshake(struct notification_client *client, &client->communication.inbound.creds); client->gid = LTTNG_SOCK_GET_GID_CRED( &client->communication.inbound.creds); - DBG("Received handshake from client (uid = %u, gid = %u) with version %i.%i", + client->is_sessiond = LTTNG_SOCK_GET_PID_CRED(&client->communication.inbound.creds) == getpid(); + DBG("Received handshake from client: uid = %u, gid = %u, protocol version = %i.%i, client is sessiond = %s", client->uid, client->gid, (int) client->major, - (int) client->minor); + (int) client->minor, + client->is_sessiond ? "true" : "false"); if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) { @@ -4056,7 +4138,47 @@ int handle_notification_thread_client_out( } pthread_mutex_lock(&client->lock); - transmission_status = client_flush_outgoing_queue(client); + if (!client_has_outbound_data_left(client)) { + /* + * A client "out" event can be received when no payload is left + * to send under some circumstances. + * + * Many threads can flush a client's outgoing queue and, if they + * had to queue their message (socket was full), will use the + * "communication update" command to signal the (e)poll thread + * to monitor for space being made available in the socket. + * + * Commands are sent over an internal pipe serviced by the same + * thread as the client sockets. + * + * When space is made available in the socket, there is a race + * between the (e)poll thread and the other threads that may + * wish to use the client's socket to flush its outgoing queue. + * + * A non-(e)poll thread may attempt (and succeed) in flushing + * the queue before the (e)poll thread gets a chance to service + * the client's "out" event. + * + * In this situation, the (e)poll thread processing the client + * out event will see an empty payload: there is nothing to do + * except unsubscribing (e)poll "out" events. + * + * Note that this thread is the (e)poll thread so it can modify + * the (e)poll mask directly without using a communication + * update command. Other threads that flush the outgoing queue + * will use the "communication update" command to wake up this + * thread and force it to monitor "out" events. + * + * When other threads succeed in emptying the outgoing queue, + * they don't need to update the (e)poll mask: if the "out" + * event is monitored, it will fire once and the (e)poll + * thread will reach this condition, causing the event to + * stop being monitored. + */ + transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE; + } else { + transmission_status = client_flush_outgoing_queue(client); + } pthread_mutex_unlock(&client->lock); ret = client_handle_transmission_status( @@ -4394,6 +4516,15 @@ int notification_client_list_send_evaluation( goto skip_client; } + if (lttng_trigger_is_hidden(trigger) && !client->is_sessiond) { + /* + * Notifications resulting from an hidden trigger are + * only sent to the session daemon. + */ + DBG("Skipping client as the trigger is hidden and the client is not the session daemon"); + goto skip_client; + } + if (source_object_creds) { if (client->uid != lttng_credentials_get_uid(source_object_creds) && client->gid != lttng_credentials_get_gid(source_object_creds) &&