Fix: sessiond: size-based rotation threshold exceeded in per-pid tracing (2/2)
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.cpp
index 586a9900b641419b39a0729dfa16e00459a7fed0..1e21fffe7bfb689525b082a1492552021f861108 100644 (file)
@@ -33,6 +33,7 @@
 #include <lttng/notification/channel-internal.hpp>
 #include <lttng/trigger/trigger-internal.hpp>
 #include <lttng/event-rule/event-rule-internal.hpp>
+#include <lttng/location-internal.hpp>
 
 #include <time.h>
 #include <unistd.h>
@@ -80,11 +81,7 @@ struct lttng_channel_trigger_list {
  *   - lttng_session_trigger_list_add()
  */
 struct lttng_session_trigger_list {
-       /*
-        * Not owned by this; points to the session_info structure's
-        * session name.
-        */
-       const char *session_name;
+       char *session_name;
        /* List of struct lttng_trigger_list_element. */
        struct cds_list_head list;
        /* Node in the session_triggers_ht */
@@ -137,7 +134,6 @@ struct channel_state_sample {
        struct cds_lfht_node channel_state_ht_node;
        uint64_t highest_usage;
        uint64_t lowest_usage;
-       uint64_t channel_total_consumed;
        /* call_rcu delayed reclaim. */
        struct rcu_head rcu_node;
 };
@@ -149,8 +145,6 @@ static int evaluate_buffer_condition(const struct lttng_condition *condition,
                const struct notification_thread_state *state,
                const struct channel_state_sample *previous_sample,
                const struct channel_state_sample *latest_sample,
-               uint64_t previous_session_consumed_total,
-               uint64_t latest_session_consumed_total,
                struct channel_info *channel_info);
 static
 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
@@ -530,10 +524,10 @@ enum lttng_object_type get_condition_binding_object(
        switch (lttng_condition_get_type(condition)) {
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
                return LTTNG_OBJECT_TYPE_CHANNEL;
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
                return LTTNG_OBJECT_TYPE_SESSION;
        case LTTNG_CONDITION_TYPE_EVENT_RULE_MATCHES:
                return LTTNG_OBJECT_TYPE_NONE;
@@ -593,6 +587,7 @@ void session_info_destroy(void *_data)
                        &session_info->sessions_ht_node);
        rcu_read_unlock();
        free(session_info->name);
+       lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location);
        call_rcu(&session_info->rcu_node, free_session_info_rcu);
 }
 
@@ -964,7 +959,6 @@ int evaluate_channel_condition_for_client(
 
        ret = evaluate_buffer_condition(condition, evaluation, state,
                        NULL, last_sample,
-                       0, channel_info->session_info->consumed_data_size,
                        channel_info);
        if (ret) {
                WARN("Fatal error occurred while evaluating a newly subscribed-to condition");
@@ -1011,62 +1005,141 @@ end:
 }
 
 static
-int evaluate_session_condition_for_client(
+bool evaluate_session_rotation_ongoing_condition(const struct lttng_condition *condition
+               __attribute__((unused)),
+               const struct session_state_sample *sample)
+{
+       return sample->rotation.ongoing;
+}
+
+static
+bool evaluate_session_consumed_size_condition(
                const struct lttng_condition *condition,
-               struct notification_thread_state *state,
-               struct lttng_evaluation **evaluation,
-               uid_t *session_uid, gid_t *session_gid)
+               const struct session_state_sample *sample)
+{
+       uint64_t threshold;
+       const struct lttng_condition_session_consumed_size *size_condition =
+                       lttng::utils::container_of(condition,
+                               &lttng_condition_session_consumed_size::parent);
+
+       threshold = size_condition->consumed_threshold_bytes.value;
+       DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
+                       threshold, sample->consumed_data_size);
+       return sample->consumed_data_size >= threshold;
+}
+
+/*
+ * `new_state` can be NULL to indicate that we are not evaluating a
+ * state transition. A client subscribed or a trigger was registered and
+ * we wish to perform an initial evaluation.
+ */
+static
+int evaluate_session_condition(
+               const struct lttng_condition *condition,
+               const struct session_info *session_info,
+               const struct session_state_sample *new_state,
+               struct lttng_evaluation **evaluation)
 {
        int ret;
-       const char *session_name;
-       struct session_info *session_info = NULL;
+       bool previous_result, newest_result;
 
-       rcu_read_lock();
-       session_name = get_condition_session_name(condition);
+       switch (lttng_condition_get_type(condition)) {
+       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
+               if (new_state) {
+                       previous_result = evaluate_session_rotation_ongoing_condition(
+                                       condition, &session_info->last_state_sample);
+                       newest_result = evaluate_session_rotation_ongoing_condition(
+                                       condition, new_state);
+               } else {
+                       previous_result = false;
+                       newest_result = evaluate_session_rotation_ongoing_condition(
+                                       condition, &session_info->last_state_sample);
+               }
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               if (new_state) {
+                       previous_result = evaluate_session_consumed_size_condition(
+                                       condition, &session_info->last_state_sample);
+                       newest_result = evaluate_session_consumed_size_condition(
+                                       condition, new_state);
+               } else {
+                       previous_result = false;
+                       newest_result = evaluate_session_consumed_size_condition(
+                                       condition, &session_info->last_state_sample);
+               }
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+               /*
+                * Note that LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED is
+                * evaluated differently to only consider state transitions without regard for the
+                * initial state. This is a deliberate choice as it is unlikely that a user would
+                * expect an action to occur for a rotation that occurred long before the trigger or
+                * subscription occurred.
+                */
+               if (!new_state) {
+                       ret = 0;
+                       goto end;
+               }
 
-       /* Find the session associated with the condition. */
-       session_info = get_session_info_by_name(state, session_name);
-       if (!session_info) {
-               DBG("Unknown session while evaluating session condition for client: name = `%s`",
-                               session_name);
+               previous_result = !session_info->last_state_sample.rotation.ongoing;
+               newest_result = !new_state->rotation.ongoing;
+               break;
+       default:
+               ret = 0;
+               goto end;
+       }
+
+       if (!newest_result || (previous_result == newest_result)) {
+               /* Not a state transition, evaluate to false. */
                ret = 0;
                goto end;
        }
 
-       /*
-        * Evaluation is performed in-line here since only one type of
-        * session-bound condition is handled for the moment.
-        */
        switch (lttng_condition_get_type(condition)) {
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
-               if (!session_info->rotation.ongoing) {
-                       ret = 0;
-                       goto end_session_put;
-               }
+       {
+               const auto rotation_id = new_state ?
+                               new_state->rotation.id :
+                                     session_info->last_state_sample.rotation.id;
 
-               *evaluation = lttng_evaluation_session_rotation_ongoing_create(
-                               session_info->rotation.id);
-               if (!*evaluation) {
-                       /* Fatal error. */
-                       ERR("Failed to create session rotation ongoing evaluation for session \"%s\"",
-                                       session_info->name);
-                       ret = -1;
-                       goto end_session_put;
-               }
-               ret = 0;
+               *evaluation = lttng_evaluation_session_rotation_ongoing_create(rotation_id);
+               break;
+       }
+       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+       {
+               const auto rotation_id = new_state ?
+                               new_state->rotation.id :
+                                     session_info->last_state_sample.rotation.id;
+
+               /* Callee acquires a reference to location. */
+               *evaluation = lttng_evaluation_session_rotation_completed_create(
+                               rotation_id, new_state->rotation.location);
+               break;
+       }
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+       {
+               const auto latest_session_consumed_total = new_state ?
+                               new_state->consumed_data_size :
+                                     session_info->last_state_sample.consumed_data_size;
+
+               *evaluation = lttng_evaluation_session_consumed_size_create(
+                               latest_session_consumed_total);
                break;
+       }
        default:
-               ret = 0;
-               goto end_session_put;
+               abort();
        }
 
-       *session_uid = session_info->uid;
-       *session_gid = session_info->gid;
+       if (!*evaluation) {
+               /* Fatal error. */
+               ERR("Failed to create session condition evaluation: session name = `%s`",
+                               session_info->name);
+               ret = -1;
+               goto end;
+       }
 
-end_session_put:
-       session_info_put(session_info);
+       ret = 0;
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1099,9 +1172,25 @@ int evaluate_condition_for_client(const struct lttng_trigger *trigger,
 
        switch (get_condition_binding_object(condition)) {
        case LTTNG_OBJECT_TYPE_SESSION:
-               ret = evaluate_session_condition_for_client(condition, state,
-                               &evaluation, &object_uid, &object_gid);
+       {
+               /* Find the session associated with the condition. */
+               const auto *session_name = get_condition_session_name(condition);
+               auto session_info = get_session_info_by_name(state, session_name);
+               if (!session_info) {
+                       /* Not an error, the session doesn't exist yet. */
+                       DBG("Session not found while evaluating session condition for client: session name = `%s`",
+                                       session_name);
+                       ret = 0;
+                       goto end;
+               }
+
+               object_uid = session_info->uid;
+               object_gid = session_info->gid;
+
+               ret = evaluate_session_condition(condition, session_info, NULL, &evaluation);
+               session_info_put(session_info);
                break;
+       }
        case LTTNG_OBJECT_TYPE_CHANNEL:
                ret = evaluate_channel_condition_for_client(condition, state,
                                &evaluation, &object_uid, &object_gid);
@@ -1453,27 +1542,6 @@ fail:
        return false;
 }
 
-static
-bool session_consumed_size_condition_applies_to_channel(
-               const struct lttng_condition *condition,
-               const struct channel_info *channel_info)
-{
-       enum lttng_condition_status status;
-       const char *condition_session_name = NULL;
-
-       status = lttng_condition_session_consumed_size_get_session_name(
-                       condition, &condition_session_name);
-       LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
-
-       if (strcmp(channel_info->session_info->name, condition_session_name)) {
-               goto fail;
-       }
-
-       return true;
-fail:
-       return false;
-}
-
 static
 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
                const struct channel_info *channel_info)
@@ -1492,10 +1560,6 @@ bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
                trigger_applies = buffer_usage_condition_applies_to_channel(
                                condition, channel_info);
                break;
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
-               trigger_applies = session_consumed_size_condition_applies_to_channel(
-                               condition, channel_info);
-               break;
        default:
                goto fail;
        }
@@ -1543,23 +1607,27 @@ end:
 /*
  * Allocate an empty lttng_session_trigger_list for the session named
  * 'session_name'.
- *
- * No ownership of 'session_name' is assumed by the session trigger list.
- * It is the caller's responsability to ensure the session name is alive
- * for as long as this list is.
  */
 static
 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
                const char *session_name,
                struct cds_lfht *session_triggers_ht)
 {
-       struct lttng_session_trigger_list *list;
+       struct lttng_session_trigger_list *list = NULL;
+       char *session_name_copy = strdup(session_name);
+
+       if (!session_name_copy) {
+               PERROR("Failed to allocate session name while building trigger list");
+               goto end;
+       }
 
        list = zmalloc<lttng_session_trigger_list>();
        if (!list) {
+               PERROR("Failed to allocate session trigger list while building trigger list");
                goto end;
        }
-       list->session_name = session_name;
+
+       list->session_name = session_name_copy;
        CDS_INIT_LIST_HEAD(&list->list);
        cds_lfht_node_init(&list->session_triggers_ht_node);
        list->session_triggers_ht = session_triggers_ht;
@@ -1577,8 +1645,11 @@ end:
 static
 void free_session_trigger_list_rcu(struct rcu_head *node)
 {
-       free(caa_container_of(node, struct lttng_session_trigger_list,
-                       rcu_node));
+       struct lttng_session_trigger_list *list =
+                       caa_container_of(node, struct lttng_session_trigger_list, rcu_node);
+
+       free(list->session_name);
+       free(list);
 }
 
 static
@@ -1630,17 +1701,11 @@ bool trigger_applies_to_session(const struct lttng_trigger *trigger,
        switch (lttng_condition_get_type(condition)) {
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
        {
-               enum lttng_condition_status condition_status;
                const char *condition_session_name;
 
-               condition_status = lttng_condition_session_rotation_get_session_name(
-                       condition, &condition_session_name);
-               if (condition_status != LTTNG_CONDITION_STATUS_OK) {
-                       ERR("Failed to retrieve session rotation condition's session name");
-                       goto end;
-               }
-
+               condition_session_name = get_condition_session_name(condition);
                LTTNG_ASSERT(condition_session_name);
                applies = !strcmp(condition_session_name, session_name);
                break;
@@ -1655,10 +1720,6 @@ end:
 /*
  * Allocate and initialize an lttng_session_trigger_list which contains
  * all triggers that apply to the session named 'session_name'.
- *
- * No ownership of 'session_name' is assumed by the session trigger list.
- * It is the caller's responsability to ensure the session name is alive
- * for as long as this list is.
  */
 static
 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
@@ -2011,6 +2072,7 @@ int handle_notification_thread_command_session_rotation(
        struct lttng_trigger_list_element *trigger_list_element;
        struct session_info *session_info;
        struct lttng_credentials session_creds;
+       struct session_state_sample new_session_state;
 
        rcu_read_lock();
 
@@ -2024,20 +2086,25 @@ int handle_notification_thread_command_session_rotation(
                goto end;
        }
 
+       new_session_state = session_info->last_state_sample;
+       if (location) {
+               lttng_trace_archive_location_get(location);
+               new_session_state.rotation.location = location;
+       } else {
+               new_session_state.rotation.location = NULL;
+       }
+
        session_creds = {
                .uid = LTTNG_OPTIONAL_INIT_VALUE(session_info->uid),
                .gid = LTTNG_OPTIONAL_INIT_VALUE(session_info->gid),
        };
 
-       session_info->rotation.ongoing =
-                       cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
-       session_info->rotation.id = trace_archive_chunk_id;
+       new_session_state.rotation.ongoing = cmd_type ==
+                       NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
+       new_session_state.rotation.id = trace_archive_chunk_id;
+
        trigger_list = get_session_trigger_list(state, session_info->name);
-       if (!trigger_list) {
-               DBG("No triggers apply to session: session name = `%s` ",
-                               session_info->name);
-               goto end;
-       }
+       LTTNG_ASSERT(trigger_list);
 
        cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
                        node) {
@@ -2045,45 +2112,34 @@ int handle_notification_thread_command_session_rotation(
                struct lttng_trigger *trigger;
                struct notification_client_list *client_list;
                struct lttng_evaluation *evaluation = NULL;
-               enum lttng_condition_type condition_type;
                enum action_executor_status executor_status;
 
                trigger = trigger_list_element->trigger;
                condition = lttng_trigger_get_const_condition(trigger);
                LTTNG_ASSERT(condition);
-               condition_type = lttng_condition_get_type(condition);
 
-               if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
-                               cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
-                       continue;
-               } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
-                               cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
-                       continue;
-               }
-
-               client_list = get_client_list_from_condition(state, condition);
-               if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
-                       evaluation = lttng_evaluation_session_rotation_ongoing_create(
-                                       trace_archive_chunk_id);
-               } else {
-                       evaluation = lttng_evaluation_session_rotation_completed_create(
-                                       trace_archive_chunk_id, location);
+               ret = evaluate_session_condition(
+                               condition, session_info, &new_session_state, &evaluation);
+               if (ret) {
+                       ret = -1;
+                       cmd_result = LTTNG_ERR_NOMEM;
+                       goto end;
                }
 
                if (!evaluation) {
-                       /* Internal error */
-                       ret = -1;
-                       cmd_result = LTTNG_ERR_UNK;
-                       goto put_list;
+                       continue;
                }
 
                /*
                 * Ownership of `evaluation` transferred to the action executor
-                * no matter the result.
+                * no matter the result. The callee acquires a reference to the
+                * client list: we can release our own.
                 */
+               client_list = get_client_list_from_condition(state, condition);
                executor_status = action_executor_enqueue_trigger(
                                state->executor, trigger, evaluation,
                                &session_creds, client_list);
+               notification_client_list_put(client_list);
                evaluation = NULL;
                switch (executor_status) {
                case ACTION_EXECUTOR_STATUS_OK:
@@ -2096,7 +2152,7 @@ int handle_notification_thread_command_session_rotation(
                         */
                        ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
                        ret = -1;
-                       goto put_list;
+                       goto end;
                case ACTION_EXECUTOR_STATUS_OVERFLOW:
                        /*
                         * TODO Add trigger identification (name/id) when
@@ -2106,18 +2162,19 @@ int handle_notification_thread_command_session_rotation(
                         */
                        WARN("No space left when enqueuing action associated with session rotation trigger");
                        ret = 0;
-                       goto put_list;
+                       goto end;
                default:
                        abort();
                }
-
-put_list:
-               notification_client_list_put(client_list);
-               if (caa_unlikely(ret)) {
-                       break;
-               }
        }
+
 end:
+       if (session_info) {
+               /* Ownership of new_session_state::location is transferred. */
+               lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location);
+               session_info->last_state_sample = new_session_state;
+       }
+
        session_info_put(session_info);
        *_cmd_result = cmd_result;
        rcu_read_unlock();
@@ -2536,25 +2593,7 @@ int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
        ASSERT_RCU_READ_LOCKED();
 
        condition = lttng_trigger_get_const_condition(trigger);
-       switch (lttng_condition_get_type(condition)) {
-       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
-       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
-       {
-               enum lttng_condition_status status;
-
-               status = lttng_condition_session_rotation_get_session_name(
-                               condition, &session_name);
-               if (status != LTTNG_CONDITION_STATUS_OK) {
-                       ERR("Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
-                       ret = -1;
-                       goto end;
-               }
-               break;
-       }
-       default:
-               ret = -1;
-               goto end;
-       }
+       session_name = get_condition_session_name(condition);
 
        trigger_list = get_session_trigger_list(state, session_name);
        if (!trigger_list) {
@@ -3001,12 +3040,25 @@ int handle_notification_thread_command_register_trigger(
         */
        switch (get_condition_binding_object(condition)) {
        case LTTNG_OBJECT_TYPE_SESSION:
-               ret = evaluate_session_condition_for_client(condition, state,
-                               &evaluation, &object_uid,
-                               &object_gid);
-               LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
-               LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
+       {
+               /* Find the session associated with the condition. */
+               const auto *session_name = get_condition_session_name(condition);
+               auto session_info = get_session_info_by_name(state, session_name);
+               if (!session_info) {
+                       /* Not an error, the session doesn't exist yet. */
+                       DBG("Session not found while evaluating session condition during registration of trigger: session name = `%s`",
+                                       session_name);
+                       ret = 0;
+                       goto success;
+               }
+
+               LTTNG_OPTIONAL_SET(&object_creds.uid, session_info->uid);
+               LTTNG_OPTIONAL_SET(&object_creds.gid, session_info->gid);
+
+               ret = evaluate_session_condition(condition, session_info, NULL, &evaluation);
+               session_info_put(session_info);
                break;
+       }
        case LTTNG_OBJECT_TYPE_CHANNEL:
                ret = evaluate_channel_condition_for_client(condition, state,
                                &evaluation, &object_uid,
@@ -3139,6 +3191,35 @@ void teardown_tracer_notifier(struct notification_thread_state *state,
        }
 }
 
+static
+void remove_trigger_from_session_trigger_list(
+       struct lttng_session_trigger_list *trigger_list,
+       const struct lttng_trigger *trigger)
+{
+       bool found = false;
+       struct lttng_trigger_list_element *trigger_element, *tmp;
+
+       cds_list_for_each_entry_safe (trigger_element, tmp, &trigger_list->list, node) {
+               if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
+                       continue;
+               }
+
+               DBG("Removed trigger from session_triggers_ht");
+               cds_list_del(&trigger_element->node);
+               free(trigger_element);
+               /* A trigger can only appear once per session. */
+               found = true;
+               break;
+       }
+
+       if (!found) {
+               ERR("Failed to find trigger associated with session: session name = `%s`",
+                               trigger_list->session_name);
+       }
+
+       LTTNG_ASSERT(found);
+}
+
 static
 int handle_notification_thread_command_unregister_trigger(
                struct notification_thread_state *state,
@@ -3147,7 +3228,6 @@ int handle_notification_thread_command_unregister_trigger(
 {
        struct cds_lfht_iter iter;
        struct cds_lfht_node *triggers_ht_node;
-       struct lttng_channel_trigger_list *trigger_list;
        struct notification_client_list *client_list;
        struct lttng_trigger_ht_element *trigger_ht_element = NULL;
        const struct lttng_condition *condition = lttng_trigger_get_const_condition(
@@ -3172,22 +3252,56 @@ int handle_notification_thread_command_unregister_trigger(
        trigger_ht_element = caa_container_of(triggers_ht_node,
                        struct lttng_trigger_ht_element, node);
 
-       /* Remove trigger from channel_triggers_ht. */
-       cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
-                       channel_triggers_ht_node) {
-               struct lttng_trigger_list_element *trigger_element, *tmp;
+       switch (get_condition_binding_object(condition)) {
+       case LTTNG_OBJECT_TYPE_CHANNEL:
+       {
+               struct lttng_channel_trigger_list *trigger_list;
 
-               cds_list_for_each_entry_safe(trigger_element, tmp,
-                               &trigger_list->list, node) {
-                       if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
-                               continue;
+               /*
+                * Remove trigger from channel_triggers_ht.
+                *
+                * Note that multiple channels may have matched the trigger's
+                * condition (e.g. all instances of a given channel in per-pid buffering
+                * mode).
+                *
+                * Iterate on all lists since we don't know the target channels' keys.
+                */
+               cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
+                               channel_triggers_ht_node) {
+                       struct lttng_trigger_list_element *trigger_element, *tmp;
+
+                       cds_list_for_each_entry_safe(
+                                       trigger_element, tmp, &trigger_list->list, node) {
+                               if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
+                                       continue;
+                               }
+
+                               DBG("Removed trigger from channel_triggers_ht");
+                               cds_list_del(&trigger_element->node);
+                               /* A trigger can only appear once per channel */
+                               break;
                        }
+               }
+               break;
+       }
+       case LTTNG_OBJECT_TYPE_SESSION:
+       {
+               auto session = get_session_info_by_name(
+                               state, get_condition_session_name(condition));
 
-                       DBG("Removed trigger from channel_triggers_ht");
-                       cds_list_del(&trigger_element->node);
-                       /* A trigger can only appear once per channel */
+               /* Session doesn't exist, no trigger to remove. */
+               if (!session) {
                        break;
                }
+
+               auto session_trigger_list = get_session_trigger_list(state, session->name);
+               remove_trigger_from_session_trigger_list(session_trigger_list, trigger);
+               session_info_put(session);
+       }
+       case LTTNG_OBJECT_TYPE_NONE:
+               break;
+       default:
+               abort();
        }
 
        if (lttng_trigger_needs_tracer_notifier(trigger)) {
@@ -4365,30 +4479,12 @@ bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
        return result;
 }
 
-static
-bool evaluate_session_consumed_size_condition(
-               const struct lttng_condition *condition,
-               uint64_t session_consumed_size)
-{
-       uint64_t threshold;
-       const struct lttng_condition_session_consumed_size *size_condition =
-                       lttng::utils::container_of(condition,
-                               &lttng_condition_session_consumed_size::parent);
-
-       threshold = size_condition->consumed_threshold_bytes.value;
-       DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
-                       threshold, session_consumed_size);
-       return session_consumed_size >= threshold;
-}
-
 static
 int evaluate_buffer_condition(const struct lttng_condition *condition,
                struct lttng_evaluation **evaluation,
                const struct notification_thread_state *state __attribute__((unused)),
                const struct channel_state_sample *previous_sample,
                const struct channel_state_sample *latest_sample,
-               uint64_t previous_session_consumed_total,
-               uint64_t latest_session_consumed_total,
                struct channel_info *channel_info)
 {
        int ret = 0;
@@ -4411,18 +4507,6 @@ int evaluate_buffer_condition(const struct lttng_condition *condition,
                                condition, latest_sample,
                                channel_info->capacity);
                break;
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
-               if (caa_likely(previous_sample_available)) {
-                       previous_sample_result =
-                               evaluate_session_consumed_size_condition(
-                                       condition,
-                                       previous_session_consumed_total);
-               }
-               latest_sample_result =
-                               evaluate_session_consumed_size_condition(
-                                       condition,
-                                       latest_session_consumed_total);
-               break;
        default:
                /* Unknown condition type; internal error. */
                abort();
@@ -4451,10 +4535,6 @@ int evaluate_buffer_condition(const struct lttng_condition *condition,
                                latest_sample->highest_usage,
                                channel_info->capacity);
                break;
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
-               *evaluation = lttng_evaluation_session_consumed_size_create(
-                               latest_session_consumed_total);
-               break;
        default:
                abort();
        }
@@ -4983,15 +5063,18 @@ int handle_notification_thread_channel_sample(
 {
        int ret = 0;
        struct lttcomm_consumer_channel_monitor_msg sample_msg;
-       struct channel_info *channel_info;
+       struct channel_info *channel_info = NULL;
        struct cds_lfht_node *node;
        struct cds_lfht_iter iter;
-       struct lttng_channel_trigger_list *trigger_list;
+       struct lttng_channel_trigger_list *channel_trigger_list;
+       struct lttng_session_trigger_list *session_trigger_list;
        struct lttng_trigger_list_element *trigger_list_element;
        bool previous_sample_available = false;
-       struct channel_state_sample previous_sample, latest_sample;
-       uint64_t previous_session_consumed_total, latest_session_consumed_total;
-       struct lttng_credentials channel_creds;
+       struct channel_state_sample channel_previous_sample, channel_new_sample;
+       struct session_state_sample session_new_sample;
+       struct lttng_credentials channel_creds = {};
+       struct lttng_credentials session_creds = {};
+       struct session_info *session;
 
        /*
         * The monitoring pipe only holds messages smaller than PIPE_BUF,
@@ -5006,19 +5089,95 @@ int handle_notification_thread_channel_sample(
        }
 
        ret = 0;
-       latest_sample.key.key = sample_msg.key;
-       latest_sample.key.domain = domain;
-       latest_sample.highest_usage = sample_msg.highest;
-       latest_sample.lowest_usage = sample_msg.lowest;
-       latest_sample.channel_total_consumed = sample_msg.total_consumed;
+       channel_new_sample.key.key = sample_msg.key;
+       channel_new_sample.key.domain = domain;
+       channel_new_sample.highest_usage = sample_msg.highest;
+       channel_new_sample.lowest_usage = sample_msg.lowest;
 
        rcu_read_lock();
 
+       session = get_session_info_by_id(state, sample_msg.session_id);
+       if (!session) {
+               DBG("Received a sample for an unknown session from consumerd: session id = %" PRIu64,
+                               sample_msg.session_id);
+               goto end_unlock;
+       }
+
+       session_new_sample = session->last_state_sample;
+       session_new_sample.consumed_data_size += sample_msg.consumed_since_last_sample;
+       session_creds = {
+               .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
+               .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
+       };
+
+       session_trigger_list = get_session_trigger_list(state, session->name);
+       LTTNG_ASSERT(session_trigger_list);
+       cds_list_for_each_entry(trigger_list_element, &session_trigger_list->list,
+                       node) {
+               const struct lttng_condition *condition;
+               struct lttng_trigger *trigger;
+               struct notification_client_list *client_list = NULL;
+               struct lttng_evaluation *evaluation = NULL;
+               enum action_executor_status executor_status;
+
+               ret = 0;
+               trigger = trigger_list_element->trigger;
+               condition = lttng_trigger_get_const_condition(trigger);
+               LTTNG_ASSERT(condition);
+
+               ret = evaluate_session_condition(
+                               condition, session, &session_new_sample, &evaluation);
+               if (caa_unlikely(ret)) {
+                       break;
+               }
+
+               if (caa_likely(!evaluation)) {
+                       continue;
+               }
+
+               /*
+                * Ownership of `evaluation` transferred to the action executor
+                * no matter the result. The callee acquires a reference to the
+                * client list: we can release our own.
+                */
+               client_list = get_client_list_from_condition(state, condition);
+               executor_status = action_executor_enqueue_trigger(
+                               state->executor, trigger, evaluation,
+                               &session_creds, client_list);
+               notification_client_list_put(client_list);
+               evaluation = NULL;
+               switch (executor_status) {
+               case ACTION_EXECUTOR_STATUS_OK:
+                       break;
+               case ACTION_EXECUTOR_STATUS_ERROR:
+               case ACTION_EXECUTOR_STATUS_INVALID:
+                       /*
+                        * TODO Add trigger identification (name/id) when
+                        * it is added to the API.
+                        */
+                       ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
+                       ret = -1;
+                       goto end_unlock;
+               case ACTION_EXECUTOR_STATUS_OVERFLOW:
+                       /*
+                        * TODO Add trigger identification (name/id) when
+                        * it is added to the API.
+                        *
+                        * Not a fatal error.
+                        */
+                       WARN("No space left when enqueuing action associated with buffer-condition trigger");
+                       ret = 0;
+                       goto end_unlock;
+               default:
+                       abort();
+               }
+       }
+
        /* Retrieve the channel's informations */
        cds_lfht_lookup(state->channels_ht,
-                       hash_channel_key(&latest_sample.key),
+                       hash_channel_key(&channel_new_sample.key),
                        match_channel_info,
-                       &latest_sample.key,
+                       &channel_new_sample.key,
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
        if (caa_unlikely(!node)) {
@@ -5029,28 +5188,26 @@ int handle_notification_thread_channel_sample(
                 * sample.
                 */
                DBG("Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
-                               latest_sample.key.key,
+                               channel_new_sample.key.key,
                                lttng_domain_type_str(domain));
                goto end_unlock;
        }
+
        channel_info = caa_container_of(node, struct channel_info,
                        channels_ht_node);
-       DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
+       DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", consumed since last sample = %" PRIu64")",
                        channel_info->name,
-                       latest_sample.key.key,
+                       channel_new_sample.key.key,
                        channel_info->session_info->name,
-                       latest_sample.highest_usage,
-                       latest_sample.lowest_usage,
-                       latest_sample.channel_total_consumed);
-
-       previous_session_consumed_total =
-                       channel_info->session_info->consumed_data_size;
+                       channel_new_sample.highest_usage,
+                       channel_new_sample.lowest_usage,
+                       sample_msg.consumed_since_last_sample);
 
        /* Retrieve the channel's last sample, if it exists, and update it. */
        cds_lfht_lookup(state->channel_state_ht,
-                       hash_channel_key(&latest_sample.key),
+                       hash_channel_key(&channel_new_sample.key),
                        match_channel_state_sample,
-                       &latest_sample.key,
+                       &channel_new_sample.key,
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
        if (caa_likely(node)) {
@@ -5061,16 +5218,11 @@ int handle_notification_thread_channel_sample(
                                struct channel_state_sample,
                                channel_state_ht_node);
 
-               memcpy(&previous_sample, stored_sample,
-                               sizeof(previous_sample));
-               stored_sample->highest_usage = latest_sample.highest_usage;
-               stored_sample->lowest_usage = latest_sample.lowest_usage;
-               stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
+               memcpy(&channel_previous_sample, stored_sample,
+                               sizeof(channel_previous_sample));
+               stored_sample->highest_usage = channel_new_sample.highest_usage;
+               stored_sample->lowest_usage = channel_new_sample.lowest_usage;
                previous_sample_available = true;
-
-               latest_session_consumed_total =
-                               previous_session_consumed_total +
-                               (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
        } else {
                /*
                 * This is the channel's first sample, allocate space for and
@@ -5084,39 +5236,30 @@ int handle_notification_thread_channel_sample(
                        goto end_unlock;
                }
 
-               memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
+               memcpy(stored_sample, &channel_new_sample, sizeof(*stored_sample));
                cds_lfht_node_init(&stored_sample->channel_state_ht_node);
                cds_lfht_add(state->channel_state_ht,
                                hash_channel_key(&stored_sample->key),
                                &stored_sample->channel_state_ht_node);
-
-               latest_session_consumed_total =
-                               previous_session_consumed_total +
-                               latest_sample.channel_total_consumed;
        }
 
-       channel_info->session_info->consumed_data_size =
-                       latest_session_consumed_total;
-
        /* Find triggers associated with this channel. */
        cds_lfht_lookup(state->channel_triggers_ht,
-                       hash_channel_key(&latest_sample.key),
+                       hash_channel_key(&channel_new_sample.key),
                        match_channel_trigger_list,
-                       &latest_sample.key,
+                       &channel_new_sample.key,
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
-       if (caa_likely(!node)) {
-               goto end_unlock;
-       }
+       LTTNG_ASSERT(node);
 
        channel_creds = (typeof(channel_creds)) {
                .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
                .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
        };
 
-       trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
+       channel_trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
                        channel_triggers_ht_node);
-       cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
+       cds_list_for_each_entry(trigger_list_element, &channel_trigger_list->list,
                        node) {
                const struct lttng_condition *condition;
                struct lttng_trigger *trigger;
@@ -5129,33 +5272,28 @@ int handle_notification_thread_channel_sample(
                condition = lttng_trigger_get_const_condition(trigger);
                LTTNG_ASSERT(condition);
 
-               /*
-                * Check if any client is subscribed to the result of this
-                * evaluation.
-                */
-               client_list = get_client_list_from_condition(state, condition);
-
                ret = evaluate_buffer_condition(condition, &evaluation, state,
-                               previous_sample_available ? &previous_sample : NULL,
-                               &latest_sample,
-                               previous_session_consumed_total,
-                               latest_session_consumed_total,
+                               previous_sample_available ? &channel_previous_sample : NULL,
+                               &channel_new_sample,
                                channel_info);
                if (caa_unlikely(ret)) {
-                       goto put_list;
+                       break;
                }
 
                if (caa_likely(!evaluation)) {
-                       goto put_list;
+                       continue;
                }
 
                /*
                 * Ownership of `evaluation` transferred to the action executor
-                * no matter the result.
+                * no matter the result. The callee acquires a reference to the
+                * client list: we can release our own.
                 */
+               client_list = get_client_list_from_condition(state, condition);
                executor_status = action_executor_enqueue_trigger(
                                state->executor, trigger, evaluation,
                                &channel_creds, client_list);
+               notification_client_list_put(client_list);
                evaluation = NULL;
                switch (executor_status) {
                case ACTION_EXECUTOR_STATUS_OK:
@@ -5168,7 +5306,7 @@ int handle_notification_thread_channel_sample(
                         */
                        ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
                        ret = -1;
-                       goto put_list;
+                       goto end_unlock;
                case ACTION_EXECUTOR_STATUS_OVERFLOW:
                        /*
                         * TODO Add trigger identification (name/id) when
@@ -5178,18 +5316,16 @@ int handle_notification_thread_channel_sample(
                         */
                        WARN("No space left when enqueuing action associated with buffer-condition trigger");
                        ret = 0;
-                       goto put_list;
+                       goto end_unlock;
                default:
                        abort();
                }
-
-put_list:
-               notification_client_list_put(client_list);
-               if (caa_unlikely(ret)) {
-                       break;
-               }
        }
 end_unlock:
+       if (session) {
+               session->last_state_sample = session_new_sample;
+       }
+       session_info_put(session);
        rcu_read_unlock();
 end:
        return ret;
This page took 0.036092 seconds and 4 git commands to generate.