X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fnotification-thread-events.cpp;h=3c33ee79f5bafecc47d0509a42ccd387ee203607;hb=6bd798094fff9f499d840131dd6b61104d0823ce;hp=279bc7c5a26aa797c08d800b38b16ae98aa04627;hpb=8ebf1688e54cb656c2a9d17d70a829664e1eaf6b;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/notification-thread-events.cpp b/src/bin/lttng-sessiond/notification-thread-events.cpp index 279bc7c5a..3c33ee79f 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.cpp +++ b/src/bin/lttng-sessiond/notification-thread-events.cpp @@ -47,8 +47,8 @@ #include "lttng-sessiond.hpp" #include "kernel.hpp" -#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP) -#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT) +#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP) +#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT) /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */ #define MAX_CAPTURE_SIZE (PIPE_BUF) @@ -60,12 +60,6 @@ enum lttng_object_type { LTTNG_OBJECT_TYPE_SESSION, }; -struct lttng_trigger_list_element { - /* No ownership of the trigger object is assumed. */ - struct lttng_trigger *trigger; - struct cds_list_head node; -}; - struct lttng_channel_trigger_list { struct channel_key channel_key; /* List of struct lttng_trigger_list_element. */ @@ -117,6 +111,13 @@ struct lttng_session_trigger_list { struct rcu_head rcu_node; }; +namespace { +struct lttng_trigger_list_element { + /* No ownership of the trigger object is assumed. */ + struct lttng_trigger *trigger; + struct cds_list_head node; +}; + struct lttng_trigger_ht_element { struct lttng_trigger *trigger; struct cds_lfht_node node; @@ -140,6 +141,7 @@ struct channel_state_sample { /* call_rcu delayed reclaim. */ struct rcu_head rcu_node; }; +} /* namespace */ static unsigned long hash_channel_key(struct channel_key *key); static int evaluate_buffer_condition(const struct lttng_condition *condition, @@ -223,8 +225,8 @@ int match_client_id(struct cds_lfht_node *node, const void *key) { /* This double-cast is intended to supress pointer-to-cast warning. */ const notification_client_id id = *((notification_client_id *) key); - const struct notification_client *client = caa_container_of( - node, struct notification_client, client_id_ht_node); + const struct notification_client *client = lttng::utils::container_of( + node, ¬ification_client::client_id_ht_node); return client->id == id; } @@ -323,8 +325,8 @@ static int match_session(struct cds_lfht_node *node, const void *key) { const char *name = (const char *) key; - struct session_info *session_info = caa_container_of( - node, struct session_info, sessions_ht_node); + struct session_info *session_info = lttng::utils::container_of( + node, &session_info::sessions_ht_node); return !strcmp(session_info->name, name); } @@ -491,7 +493,7 @@ enum lttng_object_type get_condition_binding_object( static void free_channel_info_rcu(struct rcu_head *node) { - free(caa_container_of(node, struct channel_info, rcu_node)); + free(lttng::utils::container_of(node, &channel_info::rcu_node)); } static @@ -515,7 +517,7 @@ void channel_info_destroy(struct channel_info *channel_info) static void free_session_info_rcu(struct rcu_head *node) { - free(caa_container_of(node, struct session_info, rcu_node)); + free(lttng::utils::container_of(node, &session_info::rcu_node)); } /* Don't call directly, use the ref-counting mechanism. */ @@ -670,7 +672,7 @@ static void notification_client_list_release(struct urcu_ref *list_ref) { struct notification_client_list *list = - container_of(list_ref, typeof(*list), ref); + lttng::utils::container_of(list_ref, ¬ification_client_list::ref); struct notification_client_list_element *client_list_element, *tmp; lttng_condition_put(list->condition); @@ -814,8 +816,8 @@ struct notification_client_list *get_client_list_from_condition( &iter); node = cds_lfht_iter_get_node(&iter); if (node) { - list = container_of(node, struct notification_client_list, - notification_trigger_clients_ht_node); + list = lttng::utils::container_of(node, + ¬ification_client_list::notification_trigger_clients_ht_node); list = notification_client_list_get(list) ? list : NULL; } @@ -1282,7 +1284,7 @@ end: static void free_notification_client_rcu(struct rcu_head *node) { - free(caa_container_of(node, struct notification_client, rcu_node)); + free(lttng::utils::container_of(node, ¬ification_client::rcu_node)); } static @@ -3123,28 +3125,43 @@ end: return 0; } +static +int pop_cmd_queue(struct notification_thread_handle *handle, + struct notification_thread_command **cmd) +{ + int ret; + uint64_t counter; + + pthread_mutex_lock(&handle->cmd_queue.lock); + ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter)); + if (ret != sizeof(counter)) { + ret = -1; + goto error_unlock; + } + + *cmd = cds_list_first_entry(&handle->cmd_queue.list, + struct notification_thread_command, cmd_list_node); + cds_list_del(&((*cmd)->cmd_list_node)); + ret = 0; + +error_unlock: + pthread_mutex_unlock(&handle->cmd_queue.lock); + return ret; +} + /* Returns 0 on success, 1 on exit requested, negative value on error. */ int handle_notification_thread_command( struct notification_thread_handle *handle, struct notification_thread_state *state) { int ret; - uint64_t counter; struct notification_thread_command *cmd; - /* Read the event pipe to put it back into a quiescent state. */ - ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter, - sizeof(counter)); - if (ret != sizeof(counter)) { + ret = pop_cmd_queue(handle, &cmd); + if (ret) { goto error; } - pthread_mutex_lock(&handle->cmd_queue.lock); - cmd = cds_list_first_entry(&handle->cmd_queue.list, - struct notification_thread_command, cmd_list_node); - cds_list_del(&cmd->cmd_list_node); - pthread_mutex_unlock(&handle->cmd_queue.lock); - DBG("Received `%s` command", notification_command_type_str(cmd->type)); switch (cmd->type) { @@ -3379,9 +3396,9 @@ int handle_notification_thread_client_connect( goto error; } + client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN; ret = lttng_poll_add(&state->events, client->socket, - LPOLLIN | LPOLLERR | - LPOLLHUP | LPOLLRDHUP); + client->communication.current_poll_events); if (ret < 0) { ERR("Failed to add notification channel client socket to poll set"); ret = 0; @@ -3515,6 +3532,18 @@ int handle_notification_thread_trigger_unregister_all( return error_occurred ? -1 : 0; } +static +bool client_has_outbound_data_left( + const struct notification_client *client) +{ + const struct lttng_payload_view pv = lttng_payload_view_from_payload( + &client->communication.outbound.payload, 0, -1); + const bool has_data = pv.buffer.size != 0; + const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv); + + return has_data || has_fds; +} + static int client_handle_transmission_status( struct notification_client *client, @@ -3525,24 +3554,51 @@ int client_handle_transmission_status( switch (transmission_status) { case CLIENT_TRANSMISSION_STATUS_COMPLETE: - ret = lttng_poll_mod(&state->events, client->socket, - CLIENT_POLL_MASK_IN); - if (ret) { - goto end; - } - - break; case CLIENT_TRANSMISSION_STATUS_QUEUED: + { + int current_poll_events; + int new_poll_events; /* * We want to be notified whenever there is buffer space - * available to send the rest of the payload. + * available to send the rest of the payload if we are + * waiting to send data to the client. + * + * The state of the outbound queue being sampled here is + * fine since: + * - it is okay to wake-up "for nothing" in case we see + * that data is left, but another thread succeeds in + * flushing it before us when handling the client "out" + * event. We will simply stop monitoring that event the next + * time it wakes us up and we see no data left to be sent, + * - if another thread fails to flush the entire client + * outgoing queue, it will issue a "communication update" + * command and cause the client's (e)poll mask to be + * re-evaluated. + * + * The situation we seek to avoid would be to disable the + * monitoring of "out" client events indefinitely when there is + * data to be sent, which can't happen because of the + * aforementioned "communication update" mechanism. */ - ret = lttng_poll_mod(&state->events, client->socket, - CLIENT_POLL_MASK_IN_OUT); - if (ret) { - goto end; + pthread_mutex_lock(&client->lock); + current_poll_events = client->communication.current_poll_events; + new_poll_events = client_has_outbound_data_left(client) ? + CLIENT_POLL_EVENTS_IN_OUT : + CLIENT_POLL_EVENTS_IN; + client->communication.current_poll_events = new_poll_events; + pthread_mutex_unlock(&client->lock); + + /* Update the monitored event set only if it changed. */ + if (current_poll_events != new_poll_events) { + ret = lttng_poll_mod(&state->events, client->socket, + new_poll_events); + if (ret) { + goto end; + } } + break; + } case CLIENT_TRANSMISSION_STATUS_FAIL: ret = notification_thread_client_disconnect(client, state); if (ret) { @@ -3682,18 +3738,6 @@ error: return CLIENT_TRANSMISSION_STATUS_ERROR; } -static -bool client_has_outbound_data_left( - const struct notification_client *client) -{ - const struct lttng_payload_view pv = lttng_payload_view_from_payload( - &client->communication.outbound.payload, 0, -1); - const bool has_data = pv.buffer.size != 0; - const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv); - - return has_data || has_fds; -} - /* Client lock must _not_ be held by the caller. */ static int client_send_command_reply(struct notification_client *client, @@ -3704,14 +3748,14 @@ int client_send_command_reply(struct notification_client *client, struct lttng_notification_channel_command_reply reply = { .status = (int8_t) status, }; - struct lttng_notification_channel_message msg = { - .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY, - .size = sizeof(reply), - .fds = 0, - }; + struct lttng_notification_channel_message msg; char buffer[sizeof(msg) + sizeof(reply)]; enum client_transmission_status transmission_status; + msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY; + msg.size = sizeof(reply); + msg.fds = 0; + memcpy(buffer, &msg, sizeof(msg)); memcpy(buffer + sizeof(msg), &reply, sizeof(reply)); DBG("Send command reply (%i)", (int) status); @@ -3809,15 +3853,15 @@ int client_handle_message_handshake(struct notification_client *client, .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR, .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR, }; - const struct lttng_notification_channel_message msg_header = { - .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE, - .size = sizeof(handshake_reply), - .fds = 0, - }; + struct lttng_notification_channel_message msg_header; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)]; + msg_header.type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE; + msg_header.size = sizeof(handshake_reply); + msg_header.fds = 0; + memcpy(send_buffer, &msg_header, sizeof(msg_header)); memcpy(send_buffer + sizeof(msg_header), &handshake_reply, sizeof(handshake_reply)); @@ -4102,7 +4146,47 @@ int handle_notification_thread_client_out( } pthread_mutex_lock(&client->lock); - transmission_status = client_flush_outgoing_queue(client); + if (!client_has_outbound_data_left(client)) { + /* + * A client "out" event can be received when no payload is left + * to send under some circumstances. + * + * Many threads can flush a client's outgoing queue and, if they + * had to queue their message (socket was full), will use the + * "communication update" command to signal the (e)poll thread + * to monitor for space being made available in the socket. + * + * Commands are sent over an internal pipe serviced by the same + * thread as the client sockets. + * + * When space is made available in the socket, there is a race + * between the (e)poll thread and the other threads that may + * wish to use the client's socket to flush its outgoing queue. + * + * A non-(e)poll thread may attempt (and succeed) in flushing + * the queue before the (e)poll thread gets a chance to service + * the client's "out" event. + * + * In this situation, the (e)poll thread processing the client + * out event will see an empty payload: there is nothing to do + * except unsubscribing (e)poll "out" events. + * + * Note that this thread is the (e)poll thread so it can modify + * the (e)poll mask directly without using a communication + * update command. Other threads that flush the outgoing queue + * will use the "communication update" command to wake up this + * thread and force it to monitor "out" events. + * + * When other threads succeed in emptying the outgoing queue, + * they don't need to update the (e)poll mask: if the "out" + * event is monitored, it will fire once and the (e)poll + * thread will reach this condition, causing the event to + * stop being monitored. + */ + transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE; + } else { + transmission_status = client_flush_outgoing_queue(client); + } pthread_mutex_unlock(&client->lock); ret = client_handle_transmission_status( @@ -4123,9 +4207,8 @@ bool evaluate_buffer_usage_condition(const struct lttng_condition *condition, bool result = false; uint64_t threshold; enum lttng_condition_type condition_type; - const struct lttng_condition_buffer_usage *use_condition = container_of( - condition, struct lttng_condition_buffer_usage, - parent); + const struct lttng_condition_buffer_usage *use_condition = lttng::utils::container_of( + condition, <tng_condition_buffer_usage::parent); if (use_condition->threshold_bytes.set) { threshold = use_condition->threshold_bytes.value; @@ -4181,9 +4264,8 @@ bool evaluate_session_consumed_size_condition( { uint64_t threshold; const struct lttng_condition_session_consumed_size *size_condition = - container_of(condition, - struct lttng_condition_session_consumed_size, - parent); + lttng::utils::container_of(condition, + <tng_condition_session_consumed_size::parent); threshold = size_condition->consumed_threshold_bytes.value; DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64, @@ -4281,11 +4363,11 @@ static int client_notification_overflow(struct notification_client *client) { int ret = 0; - const struct lttng_notification_channel_message msg = { - .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED, - .size = 0, - .fds = 0, - }; + struct lttng_notification_channel_message msg; + + msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED; + msg.size = 0; + msg.fds = 0; ASSERT_LOCKED(client->lock); @@ -4387,16 +4469,16 @@ int notification_client_list_send_evaluation( .trigger = (struct lttng_trigger *) trigger, .evaluation = (struct lttng_evaluation *) evaluation, }; - struct lttng_notification_channel_message msg_header = { - .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION, - .size = 0, - .fds = 0, - }; + struct lttng_notification_channel_message msg_header; const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger); lttng_payload_init(&msg_payload); + msg_header.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION; + msg_header.size = 0; + msg_header.fds = 0; + ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header, sizeof(msg_header)); if (ret) { @@ -4666,10 +4748,9 @@ int dispatch_one_event_notifier_notification(struct notification_thread_state *s } evaluation = lttng_evaluation_event_rule_matches_create( - container_of(lttng_trigger_get_const_condition( + lttng::utils::container_of(lttng_trigger_get_const_condition( element->trigger), - struct lttng_condition_event_rule_matches, - parent), + <tng_condition_event_rule_matches::parent), notification->capture_buffer, notification->capture_buf_size, false);