Fix: sessiond: size-based rotation threshold exceeded in per-pid tracing (2/2)
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 28 Jun 2022 03:36:22 +0000 (23:36 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 4 Jul 2022 15:43:43 +0000 (11:43 -0400)
For a complete description of the original problem, refer to the previous
commit.

This change implements the second part of the fix.

Buffer statistic samples are augmented to include the channel's session
id. Since a session can outlive its channels (on the session daemon
side), the consumed size conditions are now bound to the session.

This means that the "total consumed" state is now part of the
session_info structure exclusively which, overall, is cleaner.

A side-effect of this change is that consumed size conditions are now
also evaluated when a trigger is registered or when a client subscribes
to it via a notification channel instead of waiting until the next
monitoring sample.

The buffer statistics sample also expresses a "consumed size" that is
relative to the last sample that was successfully sent.

Finally, the consumer daemon sends a final buffer statistics sample when
a channel is torn down. As explained in more detail in the previous
commit, this makes the accounting of per-pid sessions more reliable when
short-live applications are traced.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I322b9f20977e59c63cf2a4254c97c4ee657e933e

src/bin/lttng-sessiond/notification-thread-events.cpp
src/bin/lttng-sessiond/notification-thread-internal.hpp
src/common/consumer/consumer-stream.cpp
src/common/consumer/consumer-timer.cpp
src/common/consumer/consumer.cpp
src/common/consumer/consumer.hpp
src/common/sessiond-comm/sessiond-comm.hpp

index 586a9900b641419b39a0729dfa16e00459a7fed0..1e21fffe7bfb689525b082a1492552021f861108 100644 (file)
@@ -33,6 +33,7 @@
 #include <lttng/notification/channel-internal.hpp>
 #include <lttng/trigger/trigger-internal.hpp>
 #include <lttng/event-rule/event-rule-internal.hpp>
+#include <lttng/location-internal.hpp>
 
 #include <time.h>
 #include <unistd.h>
@@ -80,11 +81,7 @@ struct lttng_channel_trigger_list {
  *   - lttng_session_trigger_list_add()
  */
 struct lttng_session_trigger_list {
-       /*
-        * Not owned by this; points to the session_info structure's
-        * session name.
-        */
-       const char *session_name;
+       char *session_name;
        /* List of struct lttng_trigger_list_element. */
        struct cds_list_head list;
        /* Node in the session_triggers_ht */
@@ -137,7 +134,6 @@ struct channel_state_sample {
        struct cds_lfht_node channel_state_ht_node;
        uint64_t highest_usage;
        uint64_t lowest_usage;
-       uint64_t channel_total_consumed;
        /* call_rcu delayed reclaim. */
        struct rcu_head rcu_node;
 };
@@ -149,8 +145,6 @@ static int evaluate_buffer_condition(const struct lttng_condition *condition,
                const struct notification_thread_state *state,
                const struct channel_state_sample *previous_sample,
                const struct channel_state_sample *latest_sample,
-               uint64_t previous_session_consumed_total,
-               uint64_t latest_session_consumed_total,
                struct channel_info *channel_info);
 static
 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
@@ -530,10 +524,10 @@ enum lttng_object_type get_condition_binding_object(
        switch (lttng_condition_get_type(condition)) {
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
                return LTTNG_OBJECT_TYPE_CHANNEL;
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
                return LTTNG_OBJECT_TYPE_SESSION;
        case LTTNG_CONDITION_TYPE_EVENT_RULE_MATCHES:
                return LTTNG_OBJECT_TYPE_NONE;
@@ -593,6 +587,7 @@ void session_info_destroy(void *_data)
                        &session_info->sessions_ht_node);
        rcu_read_unlock();
        free(session_info->name);
+       lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location);
        call_rcu(&session_info->rcu_node, free_session_info_rcu);
 }
 
@@ -964,7 +959,6 @@ int evaluate_channel_condition_for_client(
 
        ret = evaluate_buffer_condition(condition, evaluation, state,
                        NULL, last_sample,
-                       0, channel_info->session_info->consumed_data_size,
                        channel_info);
        if (ret) {
                WARN("Fatal error occurred while evaluating a newly subscribed-to condition");
@@ -1011,62 +1005,141 @@ end:
 }
 
 static
-int evaluate_session_condition_for_client(
+bool evaluate_session_rotation_ongoing_condition(const struct lttng_condition *condition
+               __attribute__((unused)),
+               const struct session_state_sample *sample)
+{
+       return sample->rotation.ongoing;
+}
+
+static
+bool evaluate_session_consumed_size_condition(
                const struct lttng_condition *condition,
-               struct notification_thread_state *state,
-               struct lttng_evaluation **evaluation,
-               uid_t *session_uid, gid_t *session_gid)
+               const struct session_state_sample *sample)
+{
+       uint64_t threshold;
+       const struct lttng_condition_session_consumed_size *size_condition =
+                       lttng::utils::container_of(condition,
+                               &lttng_condition_session_consumed_size::parent);
+
+       threshold = size_condition->consumed_threshold_bytes.value;
+       DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
+                       threshold, sample->consumed_data_size);
+       return sample->consumed_data_size >= threshold;
+}
+
+/*
+ * `new_state` can be NULL to indicate that we are not evaluating a
+ * state transition. A client subscribed or a trigger was registered and
+ * we wish to perform an initial evaluation.
+ */
+static
+int evaluate_session_condition(
+               const struct lttng_condition *condition,
+               const struct session_info *session_info,
+               const struct session_state_sample *new_state,
+               struct lttng_evaluation **evaluation)
 {
        int ret;
-       const char *session_name;
-       struct session_info *session_info = NULL;
+       bool previous_result, newest_result;
 
-       rcu_read_lock();
-       session_name = get_condition_session_name(condition);
+       switch (lttng_condition_get_type(condition)) {
+       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
+               if (new_state) {
+                       previous_result = evaluate_session_rotation_ongoing_condition(
+                                       condition, &session_info->last_state_sample);
+                       newest_result = evaluate_session_rotation_ongoing_condition(
+                                       condition, new_state);
+               } else {
+                       previous_result = false;
+                       newest_result = evaluate_session_rotation_ongoing_condition(
+                                       condition, &session_info->last_state_sample);
+               }
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               if (new_state) {
+                       previous_result = evaluate_session_consumed_size_condition(
+                                       condition, &session_info->last_state_sample);
+                       newest_result = evaluate_session_consumed_size_condition(
+                                       condition, new_state);
+               } else {
+                       previous_result = false;
+                       newest_result = evaluate_session_consumed_size_condition(
+                                       condition, &session_info->last_state_sample);
+               }
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+               /*
+                * Note that LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED is
+                * evaluated differently to only consider state transitions without regard for the
+                * initial state. This is a deliberate choice as it is unlikely that a user would
+                * expect an action to occur for a rotation that occurred long before the trigger or
+                * subscription occurred.
+                */
+               if (!new_state) {
+                       ret = 0;
+                       goto end;
+               }
 
-       /* Find the session associated with the condition. */
-       session_info = get_session_info_by_name(state, session_name);
-       if (!session_info) {
-               DBG("Unknown session while evaluating session condition for client: name = `%s`",
-                               session_name);
+               previous_result = !session_info->last_state_sample.rotation.ongoing;
+               newest_result = !new_state->rotation.ongoing;
+               break;
+       default:
+               ret = 0;
+               goto end;
+       }
+
+       if (!newest_result || (previous_result == newest_result)) {
+               /* Not a state transition, evaluate to false. */
                ret = 0;
                goto end;
        }
 
-       /*
-        * Evaluation is performed in-line here since only one type of
-        * session-bound condition is handled for the moment.
-        */
        switch (lttng_condition_get_type(condition)) {
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
-               if (!session_info->rotation.ongoing) {
-                       ret = 0;
-                       goto end_session_put;
-               }
+       {
+               const auto rotation_id = new_state ?
+                               new_state->rotation.id :
+                                     session_info->last_state_sample.rotation.id;
 
-               *evaluation = lttng_evaluation_session_rotation_ongoing_create(
-                               session_info->rotation.id);
-               if (!*evaluation) {
-                       /* Fatal error. */
-                       ERR("Failed to create session rotation ongoing evaluation for session \"%s\"",
-                                       session_info->name);
-                       ret = -1;
-                       goto end_session_put;
-               }
-               ret = 0;
+               *evaluation = lttng_evaluation_session_rotation_ongoing_create(rotation_id);
+               break;
+       }
+       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+       {
+               const auto rotation_id = new_state ?
+                               new_state->rotation.id :
+                                     session_info->last_state_sample.rotation.id;
+
+               /* Callee acquires a reference to location. */
+               *evaluation = lttng_evaluation_session_rotation_completed_create(
+                               rotation_id, new_state->rotation.location);
+               break;
+       }
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+       {
+               const auto latest_session_consumed_total = new_state ?
+                               new_state->consumed_data_size :
+                                     session_info->last_state_sample.consumed_data_size;
+
+               *evaluation = lttng_evaluation_session_consumed_size_create(
+                               latest_session_consumed_total);
                break;
+       }
        default:
-               ret = 0;
-               goto end_session_put;
+               abort();
        }
 
-       *session_uid = session_info->uid;
-       *session_gid = session_info->gid;
+       if (!*evaluation) {
+               /* Fatal error. */
+               ERR("Failed to create session condition evaluation: session name = `%s`",
+                               session_info->name);
+               ret = -1;
+               goto end;
+       }
 
-end_session_put:
-       session_info_put(session_info);
+       ret = 0;
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1099,9 +1172,25 @@ int evaluate_condition_for_client(const struct lttng_trigger *trigger,
 
        switch (get_condition_binding_object(condition)) {
        case LTTNG_OBJECT_TYPE_SESSION:
-               ret = evaluate_session_condition_for_client(condition, state,
-                               &evaluation, &object_uid, &object_gid);
+       {
+               /* Find the session associated with the condition. */
+               const auto *session_name = get_condition_session_name(condition);
+               auto session_info = get_session_info_by_name(state, session_name);
+               if (!session_info) {
+                       /* Not an error, the session doesn't exist yet. */
+                       DBG("Session not found while evaluating session condition for client: session name = `%s`",
+                                       session_name);
+                       ret = 0;
+                       goto end;
+               }
+
+               object_uid = session_info->uid;
+               object_gid = session_info->gid;
+
+               ret = evaluate_session_condition(condition, session_info, NULL, &evaluation);
+               session_info_put(session_info);
                break;
+       }
        case LTTNG_OBJECT_TYPE_CHANNEL:
                ret = evaluate_channel_condition_for_client(condition, state,
                                &evaluation, &object_uid, &object_gid);
@@ -1453,27 +1542,6 @@ fail:
        return false;
 }
 
-static
-bool session_consumed_size_condition_applies_to_channel(
-               const struct lttng_condition *condition,
-               const struct channel_info *channel_info)
-{
-       enum lttng_condition_status status;
-       const char *condition_session_name = NULL;
-
-       status = lttng_condition_session_consumed_size_get_session_name(
-                       condition, &condition_session_name);
-       LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
-
-       if (strcmp(channel_info->session_info->name, condition_session_name)) {
-               goto fail;
-       }
-
-       return true;
-fail:
-       return false;
-}
-
 static
 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
                const struct channel_info *channel_info)
@@ -1492,10 +1560,6 @@ bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
                trigger_applies = buffer_usage_condition_applies_to_channel(
                                condition, channel_info);
                break;
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
-               trigger_applies = session_consumed_size_condition_applies_to_channel(
-                               condition, channel_info);
-               break;
        default:
                goto fail;
        }
@@ -1543,23 +1607,27 @@ end:
 /*
  * Allocate an empty lttng_session_trigger_list for the session named
  * 'session_name'.
- *
- * No ownership of 'session_name' is assumed by the session trigger list.
- * It is the caller's responsability to ensure the session name is alive
- * for as long as this list is.
  */
 static
 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
                const char *session_name,
                struct cds_lfht *session_triggers_ht)
 {
-       struct lttng_session_trigger_list *list;
+       struct lttng_session_trigger_list *list = NULL;
+       char *session_name_copy = strdup(session_name);
+
+       if (!session_name_copy) {
+               PERROR("Failed to allocate session name while building trigger list");
+               goto end;
+       }
 
        list = zmalloc<lttng_session_trigger_list>();
        if (!list) {
+               PERROR("Failed to allocate session trigger list while building trigger list");
                goto end;
        }
-       list->session_name = session_name;
+
+       list->session_name = session_name_copy;
        CDS_INIT_LIST_HEAD(&list->list);
        cds_lfht_node_init(&list->session_triggers_ht_node);
        list->session_triggers_ht = session_triggers_ht;
@@ -1577,8 +1645,11 @@ end:
 static
 void free_session_trigger_list_rcu(struct rcu_head *node)
 {
-       free(caa_container_of(node, struct lttng_session_trigger_list,
-                       rcu_node));
+       struct lttng_session_trigger_list *list =
+                       caa_container_of(node, struct lttng_session_trigger_list, rcu_node);
+
+       free(list->session_name);
+       free(list);
 }
 
 static
@@ -1630,17 +1701,11 @@ bool trigger_applies_to_session(const struct lttng_trigger *trigger,
        switch (lttng_condition_get_type(condition)) {
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
        {
-               enum lttng_condition_status condition_status;
                const char *condition_session_name;
 
-               condition_status = lttng_condition_session_rotation_get_session_name(
-                       condition, &condition_session_name);
-               if (condition_status != LTTNG_CONDITION_STATUS_OK) {
-                       ERR("Failed to retrieve session rotation condition's session name");
-                       goto end;
-               }
-
+               condition_session_name = get_condition_session_name(condition);
                LTTNG_ASSERT(condition_session_name);
                applies = !strcmp(condition_session_name, session_name);
                break;
@@ -1655,10 +1720,6 @@ end:
 /*
  * Allocate and initialize an lttng_session_trigger_list which contains
  * all triggers that apply to the session named 'session_name'.
- *
- * No ownership of 'session_name' is assumed by the session trigger list.
- * It is the caller's responsability to ensure the session name is alive
- * for as long as this list is.
  */
 static
 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
@@ -2011,6 +2072,7 @@ int handle_notification_thread_command_session_rotation(
        struct lttng_trigger_list_element *trigger_list_element;
        struct session_info *session_info;
        struct lttng_credentials session_creds;
+       struct session_state_sample new_session_state;
 
        rcu_read_lock();
 
@@ -2024,20 +2086,25 @@ int handle_notification_thread_command_session_rotation(
                goto end;
        }
 
+       new_session_state = session_info->last_state_sample;
+       if (location) {
+               lttng_trace_archive_location_get(location);
+               new_session_state.rotation.location = location;
+       } else {
+               new_session_state.rotation.location = NULL;
+       }
+
        session_creds = {
                .uid = LTTNG_OPTIONAL_INIT_VALUE(session_info->uid),
                .gid = LTTNG_OPTIONAL_INIT_VALUE(session_info->gid),
        };
 
-       session_info->rotation.ongoing =
-                       cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
-       session_info->rotation.id = trace_archive_chunk_id;
+       new_session_state.rotation.ongoing = cmd_type ==
+                       NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
+       new_session_state.rotation.id = trace_archive_chunk_id;
+
        trigger_list = get_session_trigger_list(state, session_info->name);
-       if (!trigger_list) {
-               DBG("No triggers apply to session: session name = `%s` ",
-                               session_info->name);
-               goto end;
-       }
+       LTTNG_ASSERT(trigger_list);
 
        cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
                        node) {
@@ -2045,45 +2112,34 @@ int handle_notification_thread_command_session_rotation(
                struct lttng_trigger *trigger;
                struct notification_client_list *client_list;
                struct lttng_evaluation *evaluation = NULL;
-               enum lttng_condition_type condition_type;
                enum action_executor_status executor_status;
 
                trigger = trigger_list_element->trigger;
                condition = lttng_trigger_get_const_condition(trigger);
                LTTNG_ASSERT(condition);
-               condition_type = lttng_condition_get_type(condition);
 
-               if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
-                               cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
-                       continue;
-               } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
-                               cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
-                       continue;
-               }
-
-               client_list = get_client_list_from_condition(state, condition);
-               if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
-                       evaluation = lttng_evaluation_session_rotation_ongoing_create(
-                                       trace_archive_chunk_id);
-               } else {
-                       evaluation = lttng_evaluation_session_rotation_completed_create(
-                                       trace_archive_chunk_id, location);
+               ret = evaluate_session_condition(
+                               condition, session_info, &new_session_state, &evaluation);
+               if (ret) {
+                       ret = -1;
+                       cmd_result = LTTNG_ERR_NOMEM;
+                       goto end;
                }
 
                if (!evaluation) {
-                       /* Internal error */
-                       ret = -1;
-                       cmd_result = LTTNG_ERR_UNK;
-                       goto put_list;
+                       continue;
                }
 
                /*
                 * Ownership of `evaluation` transferred to the action executor
-                * no matter the result.
+                * no matter the result. The callee acquires a reference to the
+                * client list: we can release our own.
                 */
+               client_list = get_client_list_from_condition(state, condition);
                executor_status = action_executor_enqueue_trigger(
                                state->executor, trigger, evaluation,
                                &session_creds, client_list);
+               notification_client_list_put(client_list);
                evaluation = NULL;
                switch (executor_status) {
                case ACTION_EXECUTOR_STATUS_OK:
@@ -2096,7 +2152,7 @@ int handle_notification_thread_command_session_rotation(
                         */
                        ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
                        ret = -1;
-                       goto put_list;
+                       goto end;
                case ACTION_EXECUTOR_STATUS_OVERFLOW:
                        /*
                         * TODO Add trigger identification (name/id) when
@@ -2106,18 +2162,19 @@ int handle_notification_thread_command_session_rotation(
                         */
                        WARN("No space left when enqueuing action associated with session rotation trigger");
                        ret = 0;
-                       goto put_list;
+                       goto end;
                default:
                        abort();
                }
-
-put_list:
-               notification_client_list_put(client_list);
-               if (caa_unlikely(ret)) {
-                       break;
-               }
        }
+
 end:
+       if (session_info) {
+               /* Ownership of new_session_state::location is transferred. */
+               lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location);
+               session_info->last_state_sample = new_session_state;
+       }
+
        session_info_put(session_info);
        *_cmd_result = cmd_result;
        rcu_read_unlock();
@@ -2536,25 +2593,7 @@ int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
        ASSERT_RCU_READ_LOCKED();
 
        condition = lttng_trigger_get_const_condition(trigger);
-       switch (lttng_condition_get_type(condition)) {
-       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
-       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
-       {
-               enum lttng_condition_status status;
-
-               status = lttng_condition_session_rotation_get_session_name(
-                               condition, &session_name);
-               if (status != LTTNG_CONDITION_STATUS_OK) {
-                       ERR("Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
-                       ret = -1;
-                       goto end;
-               }
-               break;
-       }
-       default:
-               ret = -1;
-               goto end;
-       }
+       session_name = get_condition_session_name(condition);
 
        trigger_list = get_session_trigger_list(state, session_name);
        if (!trigger_list) {
@@ -3001,12 +3040,25 @@ int handle_notification_thread_command_register_trigger(
         */
        switch (get_condition_binding_object(condition)) {
        case LTTNG_OBJECT_TYPE_SESSION:
-               ret = evaluate_session_condition_for_client(condition, state,
-                               &evaluation, &object_uid,
-                               &object_gid);
-               LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
-               LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
+       {
+               /* Find the session associated with the condition. */
+               const auto *session_name = get_condition_session_name(condition);
+               auto session_info = get_session_info_by_name(state, session_name);
+               if (!session_info) {
+                       /* Not an error, the session doesn't exist yet. */
+                       DBG("Session not found while evaluating session condition during registration of trigger: session name = `%s`",
+                                       session_name);
+                       ret = 0;
+                       goto success;
+               }
+
+               LTTNG_OPTIONAL_SET(&object_creds.uid, session_info->uid);
+               LTTNG_OPTIONAL_SET(&object_creds.gid, session_info->gid);
+
+               ret = evaluate_session_condition(condition, session_info, NULL, &evaluation);
+               session_info_put(session_info);
                break;
+       }
        case LTTNG_OBJECT_TYPE_CHANNEL:
                ret = evaluate_channel_condition_for_client(condition, state,
                                &evaluation, &object_uid,
@@ -3139,6 +3191,35 @@ void teardown_tracer_notifier(struct notification_thread_state *state,
        }
 }
 
+static
+void remove_trigger_from_session_trigger_list(
+       struct lttng_session_trigger_list *trigger_list,
+       const struct lttng_trigger *trigger)
+{
+       bool found = false;
+       struct lttng_trigger_list_element *trigger_element, *tmp;
+
+       cds_list_for_each_entry_safe (trigger_element, tmp, &trigger_list->list, node) {
+               if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
+                       continue;
+               }
+
+               DBG("Removed trigger from session_triggers_ht");
+               cds_list_del(&trigger_element->node);
+               free(trigger_element);
+               /* A trigger can only appear once per session. */
+               found = true;
+               break;
+       }
+
+       if (!found) {
+               ERR("Failed to find trigger associated with session: session name = `%s`",
+                               trigger_list->session_name);
+       }
+
+       LTTNG_ASSERT(found);
+}
+
 static
 int handle_notification_thread_command_unregister_trigger(
                struct notification_thread_state *state,
@@ -3147,7 +3228,6 @@ int handle_notification_thread_command_unregister_trigger(
 {
        struct cds_lfht_iter iter;
        struct cds_lfht_node *triggers_ht_node;
-       struct lttng_channel_trigger_list *trigger_list;
        struct notification_client_list *client_list;
        struct lttng_trigger_ht_element *trigger_ht_element = NULL;
        const struct lttng_condition *condition = lttng_trigger_get_const_condition(
@@ -3172,22 +3252,56 @@ int handle_notification_thread_command_unregister_trigger(
        trigger_ht_element = caa_container_of(triggers_ht_node,
                        struct lttng_trigger_ht_element, node);
 
-       /* Remove trigger from channel_triggers_ht. */
-       cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
-                       channel_triggers_ht_node) {
-               struct lttng_trigger_list_element *trigger_element, *tmp;
+       switch (get_condition_binding_object(condition)) {
+       case LTTNG_OBJECT_TYPE_CHANNEL:
+       {
+               struct lttng_channel_trigger_list *trigger_list;
 
-               cds_list_for_each_entry_safe(trigger_element, tmp,
-                               &trigger_list->list, node) {
-                       if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
-                               continue;
+               /*
+                * Remove trigger from channel_triggers_ht.
+                *
+                * Note that multiple channels may have matched the trigger's
+                * condition (e.g. all instances of a given channel in per-pid buffering
+                * mode).
+                *
+                * Iterate on all lists since we don't know the target channels' keys.
+                */
+               cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
+                               channel_triggers_ht_node) {
+                       struct lttng_trigger_list_element *trigger_element, *tmp;
+
+                       cds_list_for_each_entry_safe(
+                                       trigger_element, tmp, &trigger_list->list, node) {
+                               if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
+                                       continue;
+                               }
+
+                               DBG("Removed trigger from channel_triggers_ht");
+                               cds_list_del(&trigger_element->node);
+                               /* A trigger can only appear once per channel */
+                               break;
                        }
+               }
+               break;
+       }
+       case LTTNG_OBJECT_TYPE_SESSION:
+       {
+               auto session = get_session_info_by_name(
+                               state, get_condition_session_name(condition));
 
-                       DBG("Removed trigger from channel_triggers_ht");
-                       cds_list_del(&trigger_element->node);
-                       /* A trigger can only appear once per channel */
+               /* Session doesn't exist, no trigger to remove. */
+               if (!session) {
                        break;
                }
+
+               auto session_trigger_list = get_session_trigger_list(state, session->name);
+               remove_trigger_from_session_trigger_list(session_trigger_list, trigger);
+               session_info_put(session);
+       }
+       case LTTNG_OBJECT_TYPE_NONE:
+               break;
+       default:
+               abort();
        }
 
        if (lttng_trigger_needs_tracer_notifier(trigger)) {
@@ -4365,30 +4479,12 @@ bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
        return result;
 }
 
-static
-bool evaluate_session_consumed_size_condition(
-               const struct lttng_condition *condition,
-               uint64_t session_consumed_size)
-{
-       uint64_t threshold;
-       const struct lttng_condition_session_consumed_size *size_condition =
-                       lttng::utils::container_of(condition,
-                               &lttng_condition_session_consumed_size::parent);
-
-       threshold = size_condition->consumed_threshold_bytes.value;
-       DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
-                       threshold, session_consumed_size);
-       return session_consumed_size >= threshold;
-}
-
 static
 int evaluate_buffer_condition(const struct lttng_condition *condition,
                struct lttng_evaluation **evaluation,
                const struct notification_thread_state *state __attribute__((unused)),
                const struct channel_state_sample *previous_sample,
                const struct channel_state_sample *latest_sample,
-               uint64_t previous_session_consumed_total,
-               uint64_t latest_session_consumed_total,
                struct channel_info *channel_info)
 {
        int ret = 0;
@@ -4411,18 +4507,6 @@ int evaluate_buffer_condition(const struct lttng_condition *condition,
                                condition, latest_sample,
                                channel_info->capacity);
                break;
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
-               if (caa_likely(previous_sample_available)) {
-                       previous_sample_result =
-                               evaluate_session_consumed_size_condition(
-                                       condition,
-                                       previous_session_consumed_total);
-               }
-               latest_sample_result =
-                               evaluate_session_consumed_size_condition(
-                                       condition,
-                                       latest_session_consumed_total);
-               break;
        default:
                /* Unknown condition type; internal error. */
                abort();
@@ -4451,10 +4535,6 @@ int evaluate_buffer_condition(const struct lttng_condition *condition,
                                latest_sample->highest_usage,
                                channel_info->capacity);
                break;
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
-               *evaluation = lttng_evaluation_session_consumed_size_create(
-                               latest_session_consumed_total);
-               break;
        default:
                abort();
        }
@@ -4983,15 +5063,18 @@ int handle_notification_thread_channel_sample(
 {
        int ret = 0;
        struct lttcomm_consumer_channel_monitor_msg sample_msg;
-       struct channel_info *channel_info;
+       struct channel_info *channel_info = NULL;
        struct cds_lfht_node *node;
        struct cds_lfht_iter iter;
-       struct lttng_channel_trigger_list *trigger_list;
+       struct lttng_channel_trigger_list *channel_trigger_list;
+       struct lttng_session_trigger_list *session_trigger_list;
        struct lttng_trigger_list_element *trigger_list_element;
        bool previous_sample_available = false;
-       struct channel_state_sample previous_sample, latest_sample;
-       uint64_t previous_session_consumed_total, latest_session_consumed_total;
-       struct lttng_credentials channel_creds;
+       struct channel_state_sample channel_previous_sample, channel_new_sample;
+       struct session_state_sample session_new_sample;
+       struct lttng_credentials channel_creds = {};
+       struct lttng_credentials session_creds = {};
+       struct session_info *session;
 
        /*
         * The monitoring pipe only holds messages smaller than PIPE_BUF,
@@ -5006,19 +5089,95 @@ int handle_notification_thread_channel_sample(
        }
 
        ret = 0;
-       latest_sample.key.key = sample_msg.key;
-       latest_sample.key.domain = domain;
-       latest_sample.highest_usage = sample_msg.highest;
-       latest_sample.lowest_usage = sample_msg.lowest;
-       latest_sample.channel_total_consumed = sample_msg.total_consumed;
+       channel_new_sample.key.key = sample_msg.key;
+       channel_new_sample.key.domain = domain;
+       channel_new_sample.highest_usage = sample_msg.highest;
+       channel_new_sample.lowest_usage = sample_msg.lowest;
 
        rcu_read_lock();
 
+       session = get_session_info_by_id(state, sample_msg.session_id);
+       if (!session) {
+               DBG("Received a sample for an unknown session from consumerd: session id = %" PRIu64,
+                               sample_msg.session_id);
+               goto end_unlock;
+       }
+
+       session_new_sample = session->last_state_sample;
+       session_new_sample.consumed_data_size += sample_msg.consumed_since_last_sample;
+       session_creds = {
+               .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
+               .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
+       };
+
+       session_trigger_list = get_session_trigger_list(state, session->name);
+       LTTNG_ASSERT(session_trigger_list);
+       cds_list_for_each_entry(trigger_list_element, &session_trigger_list->list,
+                       node) {
+               const struct lttng_condition *condition;
+               struct lttng_trigger *trigger;
+               struct notification_client_list *client_list = NULL;
+               struct lttng_evaluation *evaluation = NULL;
+               enum action_executor_status executor_status;
+
+               ret = 0;
+               trigger = trigger_list_element->trigger;
+               condition = lttng_trigger_get_const_condition(trigger);
+               LTTNG_ASSERT(condition);
+
+               ret = evaluate_session_condition(
+                               condition, session, &session_new_sample, &evaluation);
+               if (caa_unlikely(ret)) {
+                       break;
+               }
+
+               if (caa_likely(!evaluation)) {
+                       continue;
+               }
+
+               /*
+                * Ownership of `evaluation` transferred to the action executor
+                * no matter the result. The callee acquires a reference to the
+                * client list: we can release our own.
+                */
+               client_list = get_client_list_from_condition(state, condition);
+               executor_status = action_executor_enqueue_trigger(
+                               state->executor, trigger, evaluation,
+                               &session_creds, client_list);
+               notification_client_list_put(client_list);
+               evaluation = NULL;
+               switch (executor_status) {
+               case ACTION_EXECUTOR_STATUS_OK:
+                       break;
+               case ACTION_EXECUTOR_STATUS_ERROR:
+               case ACTION_EXECUTOR_STATUS_INVALID:
+                       /*
+                        * TODO Add trigger identification (name/id) when
+                        * it is added to the API.
+                        */
+                       ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
+                       ret = -1;
+                       goto end_unlock;
+               case ACTION_EXECUTOR_STATUS_OVERFLOW:
+                       /*
+                        * TODO Add trigger identification (name/id) when
+                        * it is added to the API.
+                        *
+                        * Not a fatal error.
+                        */
+                       WARN("No space left when enqueuing action associated with buffer-condition trigger");
+                       ret = 0;
+                       goto end_unlock;
+               default:
+                       abort();
+               }
+       }
+
        /* Retrieve the channel's informations */
        cds_lfht_lookup(state->channels_ht,
-                       hash_channel_key(&latest_sample.key),
+                       hash_channel_key(&channel_new_sample.key),
                        match_channel_info,
-                       &latest_sample.key,
+                       &channel_new_sample.key,
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
        if (caa_unlikely(!node)) {
@@ -5029,28 +5188,26 @@ int handle_notification_thread_channel_sample(
                 * sample.
                 */
                DBG("Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
-                               latest_sample.key.key,
+                               channel_new_sample.key.key,
                                lttng_domain_type_str(domain));
                goto end_unlock;
        }
+
        channel_info = caa_container_of(node, struct channel_info,
                        channels_ht_node);
-       DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
+       DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", consumed since last sample = %" PRIu64")",
                        channel_info->name,
-                       latest_sample.key.key,
+                       channel_new_sample.key.key,
                        channel_info->session_info->name,
-                       latest_sample.highest_usage,
-                       latest_sample.lowest_usage,
-                       latest_sample.channel_total_consumed);
-
-       previous_session_consumed_total =
-                       channel_info->session_info->consumed_data_size;
+                       channel_new_sample.highest_usage,
+                       channel_new_sample.lowest_usage,
+                       sample_msg.consumed_since_last_sample);
 
        /* Retrieve the channel's last sample, if it exists, and update it. */
        cds_lfht_lookup(state->channel_state_ht,
-                       hash_channel_key(&latest_sample.key),
+                       hash_channel_key(&channel_new_sample.key),
                        match_channel_state_sample,
-                       &latest_sample.key,
+                       &channel_new_sample.key,
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
        if (caa_likely(node)) {
@@ -5061,16 +5218,11 @@ int handle_notification_thread_channel_sample(
                                struct channel_state_sample,
                                channel_state_ht_node);
 
-               memcpy(&previous_sample, stored_sample,
-                               sizeof(previous_sample));
-               stored_sample->highest_usage = latest_sample.highest_usage;
-               stored_sample->lowest_usage = latest_sample.lowest_usage;
-               stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
+               memcpy(&channel_previous_sample, stored_sample,
+                               sizeof(channel_previous_sample));
+               stored_sample->highest_usage = channel_new_sample.highest_usage;
+               stored_sample->lowest_usage = channel_new_sample.lowest_usage;
                previous_sample_available = true;
-
-               latest_session_consumed_total =
-                               previous_session_consumed_total +
-                               (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
        } else {
                /*
                 * This is the channel's first sample, allocate space for and
@@ -5084,39 +5236,30 @@ int handle_notification_thread_channel_sample(
                        goto end_unlock;
                }
 
-               memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
+               memcpy(stored_sample, &channel_new_sample, sizeof(*stored_sample));
                cds_lfht_node_init(&stored_sample->channel_state_ht_node);
                cds_lfht_add(state->channel_state_ht,
                                hash_channel_key(&stored_sample->key),
                                &stored_sample->channel_state_ht_node);
-
-               latest_session_consumed_total =
-                               previous_session_consumed_total +
-                               latest_sample.channel_total_consumed;
        }
 
-       channel_info->session_info->consumed_data_size =
-                       latest_session_consumed_total;
-
        /* Find triggers associated with this channel. */
        cds_lfht_lookup(state->channel_triggers_ht,
-                       hash_channel_key(&latest_sample.key),
+                       hash_channel_key(&channel_new_sample.key),
                        match_channel_trigger_list,
-                       &latest_sample.key,
+                       &channel_new_sample.key,
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
-       if (caa_likely(!node)) {
-               goto end_unlock;
-       }
+       LTTNG_ASSERT(node);
 
        channel_creds = (typeof(channel_creds)) {
                .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
                .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
        };
 
-       trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
+       channel_trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
                        channel_triggers_ht_node);
-       cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
+       cds_list_for_each_entry(trigger_list_element, &channel_trigger_list->list,
                        node) {
                const struct lttng_condition *condition;
                struct lttng_trigger *trigger;
@@ -5129,33 +5272,28 @@ int handle_notification_thread_channel_sample(
                condition = lttng_trigger_get_const_condition(trigger);
                LTTNG_ASSERT(condition);
 
-               /*
-                * Check if any client is subscribed to the result of this
-                * evaluation.
-                */
-               client_list = get_client_list_from_condition(state, condition);
-
                ret = evaluate_buffer_condition(condition, &evaluation, state,
-                               previous_sample_available ? &previous_sample : NULL,
-                               &latest_sample,
-                               previous_session_consumed_total,
-                               latest_session_consumed_total,
+                               previous_sample_available ? &channel_previous_sample : NULL,
+                               &channel_new_sample,
                                channel_info);
                if (caa_unlikely(ret)) {
-                       goto put_list;
+                       break;
                }
 
                if (caa_likely(!evaluation)) {
-                       goto put_list;
+                       continue;
                }
 
                /*
                 * Ownership of `evaluation` transferred to the action executor
-                * no matter the result.
+                * no matter the result. The callee acquires a reference to the
+                * client list: we can release our own.
                 */
+               client_list = get_client_list_from_condition(state, condition);
                executor_status = action_executor_enqueue_trigger(
                                state->executor, trigger, evaluation,
                                &channel_creds, client_list);
+               notification_client_list_put(client_list);
                evaluation = NULL;
                switch (executor_status) {
                case ACTION_EXECUTOR_STATUS_OK:
@@ -5168,7 +5306,7 @@ int handle_notification_thread_channel_sample(
                         */
                        ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
                        ret = -1;
-                       goto put_list;
+                       goto end_unlock;
                case ACTION_EXECUTOR_STATUS_OVERFLOW:
                        /*
                         * TODO Add trigger identification (name/id) when
@@ -5178,18 +5316,16 @@ int handle_notification_thread_channel_sample(
                         */
                        WARN("No space left when enqueuing action associated with buffer-condition trigger");
                        ret = 0;
-                       goto put_list;
+                       goto end_unlock;
                default:
                        abort();
                }
-
-put_list:
-               notification_client_list_put(client_list);
-               if (caa_unlikely(ret)) {
-                       break;
-               }
        }
 end_unlock:
+       if (session) {
+               session->last_state_sample = session_new_sample;
+       }
+       session_info_put(session);
        rcu_read_unlock();
 end:
        return ret;
index b53c24b2398a98f77de8534d210b72677eb250de..1cab09d9707347530d34c1b5bbb5e87f46aec1b4 100644 (file)
@@ -28,6 +28,18 @@ struct channel_key {
        enum lttng_domain_type domain;
 };
 
+struct session_state_sample {
+       uint64_t consumed_data_size;
+       struct {
+               /* Whether a rotation is ongoing for this session. */
+               bool ongoing;
+               /* Identifier of the currently ongoing rotation. */
+               uint64_t id;
+               /* Location of last completed rotation. */
+               struct lttng_trace_archive_location *location;
+       } rotation;
+};
+
 struct session_info {
        struct lttng_ref ref;
        uint64_t id;
@@ -48,13 +60,8 @@ struct session_info {
         * destruction.
         */
        struct cds_lfht *sessions_ht;
-       uint64_t consumed_data_size;
-       struct {
-               /* Whether a rotation is ongoing for this session. */
-               bool ongoing;
-               /* Identifier of the currently ongoing rotation. */
-               uint64_t id;
-       } rotation;
+       /* Session's state as of the latest update. */
+       struct session_state_sample last_state_sample;
        /* call_rcu delayed reclaim. */
        struct rcu_head rcu_node;
 };
index 33f97c77e66a2e074718ad8ce81b8073c3abe7e8..bb0ec0a2436db1a9fb042b29b1e53b622c2eb60e 100644 (file)
@@ -1075,10 +1075,12 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                if (stream->globally_visible) {
                        pthread_mutex_lock(&the_consumer_data.lock);
                        pthread_mutex_lock(&stream->chan->lock);
+
                        pthread_mutex_lock(&stream->lock);
                        /* Remove every reference of the stream in the consumer. */
                        consumer_stream_delete(stream, ht);
 
+
                        destroy_close_stream(stream);
 
                        /* Update channel's refcount of the stream. */
index 4e308383d22e5a1c22e7f9442c5b9a9dc87d1020..8dae11fa2d5b465ccc3dab61f2aed7b948ab518a 100644 (file)
@@ -645,9 +645,10 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel
                        consumer_timer_thread_get_channel_monitor_pipe();
        struct lttcomm_consumer_channel_monitor_msg msg = {
                .key = channel->key,
+               .session_id = channel->session_id,
                .lowest = 0,
                .highest = 0,
-               .total_consumed = 0,
+               .consumed_since_last_sample = 0,
        };
        sample_positions_cb sample;
        get_consumed_cb get_consumed;
@@ -681,9 +682,10 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel
        if (ret) {
                return;
        }
+
        msg.highest = highest;
        msg.lowest = lowest;
-       msg.total_consumed = total_consumed;
+       msg.consumed_since_last_sample = total_consumed - channel->last_consumed_size_sample_sent;
 
        /*
         * Writes performed here are assumed to be atomic which is only
@@ -706,6 +708,7 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel
                DBG("Sent channel monitoring sample for channel key %" PRIu64
                                ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")",
                                channel->key, msg.highest, msg.lowest);
+               channel->last_consumed_size_sample_sent = msg.consumed_since_last_sample;
        }
 }
 
index 8326a8e391d64d7506fcab19a981558955fcfd0b..3272c129fe353c2ad6b75f3f359391ce55ced9bf 100644 (file)
@@ -383,6 +383,12 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                consumer_timer_monitor_stop(channel);
        }
 
+       /*
+        * Send a last buffer statistics sample to the session daemon
+        * to ensure it tracks the amount of data consumed by this channel.
+        */
+       sample_and_send_channel_buffer_stats(channel);
+
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
index 44dd5d1bb4582d5e740499c89f925db9ddd6b30e..fb7f59bb21883ec8c8888c1069ab66020b548d45 100644 (file)
@@ -253,6 +253,7 @@ struct lttng_consumer_channel {
        uint64_t lost_packets;
 
        bool streams_sent_to_relayd;
+       uint64_t last_consumed_size_sample_sent;
 };
 
 struct stream_subbuffer {
index 257d70206799c9769290754913bf2d816f84198f..4bcc77bbbb3035ab08e7d2313e42a8538165f995 100644 (file)
@@ -848,6 +848,8 @@ struct lttcomm_consumer_msg {
 struct lttcomm_consumer_channel_monitor_msg {
        /* Key of the sampled channel. */
        uint64_t key;
+       /* Id of the sampled channel's session. */
+       uint64_t session_id;
        /*
         * Lowest and highest usage (bytes) at the moment the sample was taken.
         */
@@ -855,7 +857,7 @@ struct lttcomm_consumer_channel_monitor_msg {
        /*
         * Sum of all the consumed positions for a channel.
         */
-       uint64_t total_consumed;
+       uint64_t consumed_since_last_sample;
 } LTTNG_PACKED;
 
 /*
This page took 0.041607 seconds and 4 git commands to generate.