#include "lttng-sessiond.h"
#include "kernel.h"
-#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
+#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT)
/* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
#define MAX_CAPTURE_SIZE (PIPE_BUF)
lttng_domain_type_str(domain_type));
/* Adding the read side pipe to the event poll. */
- ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR);
+ ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLPRI | LPOLLIN | LPOLLERR);
if (ret < 0) {
ERR("Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
tracer_event_source_fd,
}
static
-int handle_notification_thread_command_remove_tracer_event_source(
- struct notification_thread_state *state,
- int tracer_event_source_fd,
- enum lttng_error_code *_cmd_result)
+struct notification_event_tracer_event_source_element *
+find_tracer_event_source_element(struct notification_thread_state *state,
+ int tracer_event_source_fd)
{
- int ret = 0;
- bool found = false;
- enum lttng_error_code cmd_result = LTTNG_OK;
- struct notification_event_tracer_event_source_element *source_element = NULL, *tmp;
+ struct notification_event_tracer_event_source_element *source_element;
- cds_list_for_each_entry_safe(source_element, tmp,
+ cds_list_for_each_entry(source_element,
&state->tracer_event_sources_list, node) {
- if (source_element->fd != tracer_event_source_fd) {
- continue;
+ if (source_element->fd == tracer_event_source_fd) {
+ goto end;
}
-
- DBG("Removed tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
- tracer_event_source_fd,
- lttng_domain_type_str(source_element->domain));
- cds_list_del(&source_element->node);
- found = true;
- break;
}
- if (!found) {
- /*
- * This is temporarily allowed since the poll activity set is
- * not properly cleaned-up for the moment. This is adressed in
- * an upcoming fix.
- */
- source_element = NULL;
- goto end;
- }
+ source_element = NULL;
+end:
+ return source_element;
+}
- if (!source_element->is_fd_in_poll_set) {
- /* Skip the poll set removal. */
- goto end;
- }
+static
+int remove_tracer_event_source_from_pollset(
+ struct notification_thread_state *state,
+ struct notification_event_tracer_event_source_element *source_element)
+{
+ int ret = 0;
+
+ assert(source_element->is_fd_in_poll_set);
DBG3("Removing tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
- tracer_event_source_fd,
+ source_element->fd,
lttng_domain_type_str(source_element->domain));
/* Removing the fd from the event poll set. */
- ret = lttng_poll_del(&state->events, tracer_event_source_fd);
+ ret = lttng_poll_del(&state->events, source_element->fd);
if (ret < 0) {
ERR("Failed to remove tracer event source from poll set: tracer_event_source_fd = %d, domain = '%s'",
- tracer_event_source_fd,
+ source_element->fd,
lttng_domain_type_str(source_element->domain));
- cmd_result = LTTNG_ERR_FATAL;
+ ret = -1;
goto end;
}
source_element->is_fd_in_poll_set = false;
- ret = drain_event_notifier_notification_pipe(state, tracer_event_source_fd,
+ /*
+ * Force the notification thread to restart the poll() loop to ensure
+ * that any events from the removed fd are removed.
+ */
+ state->restart_poll = true;
+
+ ret = drain_event_notifier_notification_pipe(state, source_element->fd,
source_element->domain);
if (ret) {
ERR("Error draining event notifier notification: tracer_event_source_fd = %d, domain = %s",
- tracer_event_source_fd,
+ source_element->fd,
lttng_domain_type_str(source_element->domain));
- cmd_result = LTTNG_ERR_FATAL;
+ ret = -1;
goto end;
}
- /*
- * The drain_event_notifier_notification_pipe() call might have read
- * data from an fd that we received in event in the latest _poll_wait()
- * call. Make sure the thread call poll_wait() again to ensure we have
- * a clean state.
- */
- state->restart_poll = true;
-
end:
- free(source_element);
- *_cmd_result = cmd_result;
return ret;
}
-int handle_notification_thread_remove_tracer_event_source_no_result(
+int handle_notification_thread_tracer_event_source_died(
struct notification_thread_state *state,
int tracer_event_source_fd)
{
- int ret;
- enum lttng_error_code cmd_result;
+ int ret = 0;
+ struct notification_event_tracer_event_source_element *source_element;
+
+ source_element = find_tracer_event_source_element(state,
+ tracer_event_source_fd);
+
+ assert(source_element);
+
+ ret = remove_tracer_event_source_from_pollset(state, source_element);
+ if (ret) {
+ ERR("Failed to remove dead tracer event source from poll set");
+ }
+
+ return ret;
+}
+
+static
+int handle_notification_thread_command_remove_tracer_event_source(
+ struct notification_thread_state *state,
+ int tracer_event_source_fd,
+ enum lttng_error_code *_cmd_result)
+{
+ int ret = 0;
+ enum lttng_error_code cmd_result = LTTNG_OK;
+ struct notification_event_tracer_event_source_element *source_element = NULL;
- ret = handle_notification_thread_command_remove_tracer_event_source(
- state, tracer_event_source_fd, &cmd_result);
- (void) cmd_result;
+ source_element = find_tracer_event_source_element(state,
+ tracer_event_source_fd);
+
+ assert(source_element);
+
+ /* Remove the tracer source from the list. */
+ cds_list_del(&source_element->node);
+
+ if (!source_element->is_fd_in_poll_set) {
+ /* Skip the poll set removal. */
+ goto end;
+ }
+
+ ret = remove_tracer_event_source_from_pollset(state, source_element);
+ if (ret) {
+ ERR("Failed to remove tracer event source from poll set");
+ cmd_result = LTTNG_ERR_FATAL;
+ }
+
+end:
+ free(source_element);
+ *_cmd_result = cmd_result;
return ret;
}
client_list = notification_client_list_create(state, condition);
if (!client_list) {
ERR("Error creating notification client list for trigger %s", trigger->name);
+ ret = -1;
goto error_free_ht_element;
}
}
return 0;
}
+static
+int pop_cmd_queue(struct notification_thread_handle *handle,
+ struct notification_thread_command **cmd)
+{
+ int ret;
+ uint64_t counter;
+
+ pthread_mutex_lock(&handle->cmd_queue.lock);
+ ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+ if (ret != sizeof(counter)) {
+ ret = -1;
+ goto error_unlock;
+ }
+
+ /* Simulate behaviour of EFD_SEMAPHORE for older kernels. */
+ counter -= 1;
+ if (counter != 0) {
+ ret = lttng_write(handle->cmd_queue.event_fd, &counter,
+ sizeof(counter));
+ if (ret != sizeof(counter)) {
+ PERROR("Failed to write back to event_fd for EFD_SEMAPHORE emulation");
+ ret = -1;
+ goto error_unlock;
+ }
+ }
+
+ *cmd = cds_list_first_entry(&handle->cmd_queue.list,
+ struct notification_thread_command, cmd_list_node);
+ cds_list_del(&((*cmd)->cmd_list_node));
+ ret = 0;
+
+error_unlock:
+ pthread_mutex_unlock(&handle->cmd_queue.lock);
+ return ret;
+}
+
/* Returns 0 on success, 1 on exit requested, negative value on error. */
int handle_notification_thread_command(
struct notification_thread_handle *handle,
struct notification_thread_state *state)
{
int ret;
- uint64_t counter;
struct notification_thread_command *cmd;
- /* Read the event pipe to put it back into a quiescent state. */
- ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
- sizeof(counter));
- if (ret != sizeof(counter)) {
+ ret = pop_cmd_queue(handle, &cmd);
+ if (ret) {
goto error;
}
- pthread_mutex_lock(&handle->cmd_queue.lock);
- cmd = cds_list_first_entry(&handle->cmd_queue.list,
- struct notification_thread_command, cmd_list_node);
- cds_list_del(&cmd->cmd_list_node);
- pthread_mutex_unlock(&handle->cmd_queue.lock);
-
DBG("Received `%s` command",
notification_command_type_str(cmd->type));
switch (cmd->type) {
free(cmd);
cmd = NULL;
} else {
- lttng_waiter_wake_up(&cmd->reply_waiter);
+ lttng_waiter_wake(&cmd->reply_waiter);
}
return ret;
error_unlock:
/* Wake-up and return a fatal error to the calling thread. */
- lttng_waiter_wake_up(&cmd->reply_waiter);
+ lttng_waiter_wake(&cmd->reply_waiter);
cmd->reply_code = LTTNG_ERR_FATAL;
error:
/* Indicate a fatal error to the caller. */
goto error;
}
+ client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN;
ret = lttng_poll_add(&state->events, client->socket,
- LPOLLIN | LPOLLERR |
- LPOLLHUP | LPOLLRDHUP);
+ client->communication.current_poll_events);
if (ret < 0) {
ERR("Failed to add notification channel client socket to poll set");
ret = 0;
return error_occurred ? -1 : 0;
}
+static
+bool client_has_outbound_data_left(
+ const struct notification_client *client)
+{
+ const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+ &client->communication.outbound.payload, 0, -1);
+ const bool has_data = pv.buffer.size != 0;
+ const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
+
+ return has_data || has_fds;
+}
+
static
int client_handle_transmission_status(
struct notification_client *client,
switch (transmission_status) {
case CLIENT_TRANSMISSION_STATUS_COMPLETE:
- ret = lttng_poll_mod(&state->events, client->socket,
- CLIENT_POLL_MASK_IN);
- if (ret) {
- goto end;
- }
-
- break;
case CLIENT_TRANSMISSION_STATUS_QUEUED:
+ {
+ int current_poll_events;
+ int new_poll_events;
/*
* We want to be notified whenever there is buffer space
- * available to send the rest of the payload.
+ * available to send the rest of the payload if we are
+ * waiting to send data to the client.
+ *
+ * The state of the outbound queue being sampled here is
+ * fine since:
+ * - it is okay to wake-up "for nothing" in case we see
+ * that data is left, but another thread succeeds in
+ * flushing it before us when handling the client "out"
+ * event. We will simply stop monitoring that event the next
+ * time it wakes us up and we see no data left to be sent,
+ * - if another thread fails to flush the entire client
+ * outgoing queue, it will issue a "communication update"
+ * command and cause the client's (e)poll mask to be
+ * re-evaluated.
+ *
+ * The situation we seek to avoid would be to disable the
+ * monitoring of "out" client events indefinitely when there is
+ * data to be sent, which can't happen because of the
+ * aforementioned "communication update" mechanism.
*/
- ret = lttng_poll_mod(&state->events, client->socket,
- CLIENT_POLL_MASK_IN_OUT);
- if (ret) {
- goto end;
+ pthread_mutex_lock(&client->lock);
+ current_poll_events = client->communication.current_poll_events;
+ new_poll_events = client_has_outbound_data_left(client) ?
+ CLIENT_POLL_EVENTS_IN_OUT :
+ CLIENT_POLL_EVENTS_IN;
+ client->communication.current_poll_events = new_poll_events;
+ pthread_mutex_unlock(&client->lock);
+
+ /* Update the monitored event set only if it changed. */
+ if (current_poll_events != new_poll_events) {
+ ret = lttng_poll_mod(&state->events, client->socket,
+ new_poll_events);
+ if (ret) {
+ goto end;
+ }
}
+
break;
+ }
case CLIENT_TRANSMISSION_STATUS_FAIL:
ret = notification_thread_client_disconnect(client, state);
if (ret) {
return CLIENT_TRANSMISSION_STATUS_ERROR;
}
-static
-bool client_has_outbound_data_left(
- const struct notification_client *client)
-{
- const struct lttng_payload_view pv = lttng_payload_view_from_payload(
- &client->communication.outbound.payload, 0, -1);
- const bool has_data = pv.buffer.size != 0;
- const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
-
- return has_data || has_fds;
-}
-
/* Client lock must _not_ be held by the caller. */
static
int client_send_command_reply(struct notification_client *client,
&client->communication.inbound.creds);
client->gid = LTTNG_SOCK_GET_GID_CRED(
&client->communication.inbound.creds);
- DBG("Received handshake from client (uid = %u, gid = %u) with version %i.%i",
+ client->is_sessiond = LTTNG_SOCK_GET_PID_CRED(&client->communication.inbound.creds) == getpid();
+ DBG("Received handshake from client: uid = %u, gid = %u, protocol version = %i.%i, client is sessiond = %s",
client->uid, client->gid, (int) client->major,
- (int) client->minor);
+ (int) client->minor,
+ client->is_sessiond ? "true" : "false");
if (handshake_client->major !=
LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
}
pthread_mutex_lock(&client->lock);
- transmission_status = client_flush_outgoing_queue(client);
+ if (!client_has_outbound_data_left(client)) {
+ /*
+ * A client "out" event can be received when no payload is left
+ * to send under some circumstances.
+ *
+ * Many threads can flush a client's outgoing queue and, if they
+ * had to queue their message (socket was full), will use the
+ * "communication update" command to signal the (e)poll thread
+ * to monitor for space being made available in the socket.
+ *
+ * Commands are sent over an internal pipe serviced by the same
+ * thread as the client sockets.
+ *
+ * When space is made available in the socket, there is a race
+ * between the (e)poll thread and the other threads that may
+ * wish to use the client's socket to flush its outgoing queue.
+ *
+ * A non-(e)poll thread may attempt (and succeed) in flushing
+ * the queue before the (e)poll thread gets a chance to service
+ * the client's "out" event.
+ *
+ * In this situation, the (e)poll thread processing the client
+ * out event will see an empty payload: there is nothing to do
+ * except unsubscribing (e)poll "out" events.
+ *
+ * Note that this thread is the (e)poll thread so it can modify
+ * the (e)poll mask directly without using a communication
+ * update command. Other threads that flush the outgoing queue
+ * will use the "communication update" command to wake up this
+ * thread and force it to monitor "out" events.
+ *
+ * When other threads succeed in emptying the outgoing queue,
+ * they don't need to update the (e)poll mask: if the "out"
+ * event is monitored, it will fire once and the (e)poll
+ * thread will reach this condition, causing the event to
+ * stop being monitored.
+ */
+ transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+ } else {
+ transmission_status = client_flush_outgoing_queue(client);
+ }
pthread_mutex_unlock(&client->lock);
ret = client_handle_transmission_status(
goto skip_client;
}
+ if (lttng_trigger_is_hidden(trigger) && !client->is_sessiond) {
+ /*
+ * Notifications resulting from an hidden trigger are
+ * only sent to the session daemon.
+ */
+ DBG("Skipping client as the trigger is hidden and the client is not the session daemon");
+ goto skip_client;
+ }
+
if (source_object_creds) {
if (client->uid != lttng_credentials_get_uid(source_object_creds) &&
client->gid != lttng_credentials_get_gid(source_object_creds) &&
notification->capture_buf_size, false);
if (evaluation == NULL) {
- ERR("Failed to create event rule hit evaluation while creating and enqueuing action executor job");
+ ERR("Failed to create event rule matches evaluation while creating and enqueuing action executor job");
ret = -1;
goto end_unlock;
}
+
client_list = get_client_list_from_condition(state,
lttng_trigger_get_const_condition(element->trigger));
executor_status = action_executor_enqueue_trigger(state->executor,