Implement rotation command handlers in notification system
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
9b63a4aa 31#include <lttng/action/action-internal.h>
ab0ee2ca
JG
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
e8360425 35#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 36#include <lttng/condition/session-rotation-internal.h>
ab0ee2ca 37#include <lttng/notification/channel-internal.h>
1da26331 38
ab0ee2ca
JG
39#include <time.h>
40#include <unistd.h>
41#include <assert.h>
42#include <inttypes.h>
d53ea4e4 43#include <fcntl.h>
ab0ee2ca 44
1da26331
JG
45#include "notification-thread.h"
46#include "notification-thread-events.h"
47#include "notification-thread-commands.h"
48#include "lttng-sessiond.h"
49#include "kernel.h"
50
ab0ee2ca
JG
51#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
52#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
53
51eab943
JG
54enum lttng_object_type {
55 LTTNG_OBJECT_TYPE_UNKNOWN,
56 LTTNG_OBJECT_TYPE_NONE,
57 LTTNG_OBJECT_TYPE_CHANNEL,
58 LTTNG_OBJECT_TYPE_SESSION,
59};
60
ab0ee2ca 61struct lttng_trigger_list_element {
9e0bb80e
JG
62 /* No ownership of the trigger object is assumed. */
63 const struct lttng_trigger *trigger;
ab0ee2ca
JG
64 struct cds_list_head node;
65};
66
67struct lttng_channel_trigger_list {
68 struct channel_key channel_key;
ea9a44f0 69 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 70 struct cds_list_head list;
ea9a44f0 71 /* Node in the channel_triggers_ht */
ab0ee2ca
JG
72 struct cds_lfht_node channel_triggers_ht_node;
73};
74
ea9a44f0
JG
75/*
76 * List of triggers applying to a given session.
77 *
78 * See:
79 * - lttng_session_trigger_list_create()
80 * - lttng_session_trigger_list_build()
81 * - lttng_session_trigger_list_destroy()
82 * - lttng_session_trigger_list_add()
83 */
84struct lttng_session_trigger_list {
85 /*
86 * Not owned by this; points to the session_info structure's
87 * session name.
88 */
89 const char *session_name;
90 /* List of struct lttng_trigger_list_element. */
91 struct cds_list_head list;
92 /* Node in the session_triggers_ht */
93 struct cds_lfht_node session_triggers_ht_node;
94 /*
95 * Weak reference to the notification system's session triggers
96 * hashtable.
97 *
98 * The session trigger list structure structure is owned by
99 * the session's session_info.
100 *
101 * The session_info is kept alive the the channel_infos holding a
102 * reference to it (reference counting). When those channels are
103 * destroyed (at runtime or on teardown), the reference they hold
104 * to the session_info are released. On destruction of session_info,
105 * session_info_destroy() will remove the list of triggers applying
106 * to this session from the notification system's state.
107 *
108 * This implies that the session_triggers_ht must be destroyed
109 * after the channels.
110 */
111 struct cds_lfht *session_triggers_ht;
112 /* Used for delayed RCU reclaim. */
113 struct rcu_head rcu_node;
114};
115
ab0ee2ca
JG
116struct lttng_trigger_ht_element {
117 struct lttng_trigger *trigger;
118 struct cds_lfht_node node;
119};
120
121struct lttng_condition_list_element {
122 struct lttng_condition *condition;
123 struct cds_list_head node;
124};
125
126struct notification_client_list_element {
127 struct notification_client *client;
128 struct cds_list_head node;
129};
130
131struct notification_client_list {
132 struct lttng_trigger *trigger;
133 struct cds_list_head list;
134 struct cds_lfht_node notification_trigger_ht_node;
135};
136
137struct notification_client {
138 int socket;
139 /* Client protocol version. */
140 uint8_t major, minor;
141 uid_t uid;
142 gid_t gid;
143 /*
5332364f 144 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
145 * checked.
146 */
147 bool validated;
148 /*
149 * Conditions to which the client's notification channel is subscribed.
150 * List of struct lttng_condition_list_node. The condition member is
151 * owned by the client.
152 */
153 struct cds_list_head condition_list;
154 struct cds_lfht_node client_socket_ht_node;
155 struct {
156 struct {
14fa22f8
JG
157 /*
158 * During the reception of a message, the reception
159 * buffers' "size" is set to contain the current
160 * message's complete payload.
161 */
ab0ee2ca
JG
162 struct lttng_dynamic_buffer buffer;
163 /* Bytes left to receive for the current message. */
164 size_t bytes_to_receive;
165 /* Type of the message being received. */
166 enum lttng_notification_channel_message_type msg_type;
167 /*
168 * Indicates whether or not credentials are expected
169 * from the client.
170 */
01ea340e 171 bool expect_creds;
ab0ee2ca
JG
172 /*
173 * Indicates whether or not credentials were received
174 * from the client.
175 */
176 bool creds_received;
14fa22f8 177 /* Only used during credentials reception. */
ab0ee2ca
JG
178 lttng_sock_cred creds;
179 } inbound;
180 struct {
181 /*
182 * Indicates whether or not a notification addressed to
183 * this client was dropped because a command reply was
184 * already buffered.
185 *
186 * A notification is dropped whenever the buffer is not
187 * empty.
188 */
189 bool dropped_notification;
190 /*
191 * Indicates whether or not a command reply is already
192 * buffered. In this case, it means that the client is
193 * not consuming command replies before emitting a new
194 * one. This could be caused by a protocol error or a
195 * misbehaving/malicious client.
196 */
197 bool queued_command_reply;
198 struct lttng_dynamic_buffer buffer;
199 } outbound;
200 } communication;
201};
202
203struct channel_state_sample {
204 struct channel_key key;
205 struct cds_lfht_node channel_state_ht_node;
206 uint64_t highest_usage;
207 uint64_t lowest_usage;
e8360425 208 uint64_t channel_total_consumed;
ab0ee2ca
JG
209};
210
e4db5ace 211static unsigned long hash_channel_key(struct channel_key *key);
ed327204 212static int evaluate_buffer_condition(const struct lttng_condition *condition,
e4db5ace 213 struct lttng_evaluation **evaluation,
e8360425
JD
214 const struct notification_thread_state *state,
215 const struct channel_state_sample *previous_sample,
216 const struct channel_state_sample *latest_sample,
217 uint64_t previous_session_consumed_total,
218 uint64_t latest_session_consumed_total,
219 struct channel_info *channel_info);
e4db5ace 220static
9b63a4aa
JG
221int send_evaluation_to_clients(const struct lttng_trigger *trigger,
222 const struct lttng_evaluation *evaluation,
e4db5ace
JR
223 struct notification_client_list *client_list,
224 struct notification_thread_state *state,
225 uid_t channel_uid, gid_t channel_gid);
226
8abe313a 227
ea9a44f0 228/* session_info API */
8abe313a
JG
229static
230void session_info_destroy(void *_data);
231static
232void session_info_get(struct session_info *session_info);
233static
234void session_info_put(struct session_info *session_info);
235static
236struct session_info *session_info_create(const char *name,
ea9a44f0 237 uid_t uid, gid_t gid,
1eee26c5
JG
238 struct lttng_session_trigger_list *trigger_list,
239 struct cds_lfht *sessions_ht);
8abe313a
JG
240static
241void session_info_add_channel(struct session_info *session_info,
242 struct channel_info *channel_info);
243static
244void session_info_remove_channel(struct session_info *session_info,
245 struct channel_info *channel_info);
246
ea9a44f0
JG
247/* lttng_session_trigger_list API */
248static
249struct lttng_session_trigger_list *lttng_session_trigger_list_create(
250 const char *session_name,
251 struct cds_lfht *session_triggers_ht);
252static
253struct lttng_session_trigger_list *lttng_session_trigger_list_build(
254 const struct notification_thread_state *state,
255 const char *session_name);
256static
257void lttng_session_trigger_list_destroy(
258 struct lttng_session_trigger_list *list);
259static
260int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
261 const struct lttng_trigger *trigger);
262
263
ab0ee2ca
JG
264static
265int match_client(struct cds_lfht_node *node, const void *key)
266{
267 /* This double-cast is intended to supress pointer-to-cast warning. */
268 int socket = (int) (intptr_t) key;
269 struct notification_client *client;
270
271 client = caa_container_of(node, struct notification_client,
272 client_socket_ht_node);
273
274 return !!(client->socket == socket);
275}
276
277static
278int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
279{
280 struct channel_key *channel_key = (struct channel_key *) key;
281 struct lttng_channel_trigger_list *trigger_list;
282
283 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
284 channel_triggers_ht_node);
285
286 return !!((channel_key->key == trigger_list->channel_key.key) &&
287 (channel_key->domain == trigger_list->channel_key.domain));
288}
289
51eab943
JG
290static
291int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
292{
293 const char *session_name = (const char *) key;
294 struct lttng_session_trigger_list *trigger_list;
295
296 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
297 session_triggers_ht_node);
298
299 return !!(strcmp(trigger_list->session_name, session_name) == 0);
300}
301
ab0ee2ca
JG
302static
303int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
304{
305 struct channel_key *channel_key = (struct channel_key *) key;
306 struct channel_state_sample *sample;
307
308 sample = caa_container_of(node, struct channel_state_sample,
309 channel_state_ht_node);
310
311 return !!((channel_key->key == sample->key.key) &&
312 (channel_key->domain == sample->key.domain));
313}
314
315static
316int match_channel_info(struct cds_lfht_node *node, const void *key)
317{
318 struct channel_key *channel_key = (struct channel_key *) key;
319 struct channel_info *channel_info;
320
321 channel_info = caa_container_of(node, struct channel_info,
322 channels_ht_node);
323
324 return !!((channel_key->key == channel_info->key.key) &&
325 (channel_key->domain == channel_info->key.domain));
326}
327
328static
329int match_condition(struct cds_lfht_node *node, const void *key)
330{
331 struct lttng_condition *condition_key = (struct lttng_condition *) key;
332 struct lttng_trigger_ht_element *trigger;
333 struct lttng_condition *condition;
334
335 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
336 node);
337 condition = lttng_trigger_get_condition(trigger->trigger);
338 assert(condition);
339
340 return !!lttng_condition_is_equal(condition_key, condition);
341}
342
ab0ee2ca
JG
343static
344int match_client_list_condition(struct cds_lfht_node *node, const void *key)
345{
346 struct lttng_condition *condition_key = (struct lttng_condition *) key;
347 struct notification_client_list *client_list;
348 struct lttng_condition *condition;
349
350 assert(condition_key);
351
352 client_list = caa_container_of(node, struct notification_client_list,
353 notification_trigger_ht_node);
354 condition = lttng_trigger_get_condition(client_list->trigger);
355
356 return !!lttng_condition_is_equal(condition_key, condition);
357}
358
359static
360unsigned long lttng_condition_buffer_usage_hash(
9b63a4aa 361 const struct lttng_condition *_condition)
ab0ee2ca 362{
9a2746aa
JG
363 unsigned long hash;
364 unsigned long condition_type;
ab0ee2ca
JG
365 struct lttng_condition_buffer_usage *condition;
366
367 condition = container_of(_condition,
368 struct lttng_condition_buffer_usage, parent);
369
9a2746aa
JG
370 condition_type = (unsigned long) condition->parent.type;
371 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
ab0ee2ca
JG
372 if (condition->session_name) {
373 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
374 }
375 if (condition->channel_name) {
8f56701f 376 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
377 }
378 if (condition->domain.set) {
379 hash ^= hash_key_ulong(
380 (void *) condition->domain.type,
381 lttng_ht_seed);
382 }
383 if (condition->threshold_ratio.set) {
384 uint64_t val;
385
386 val = condition->threshold_ratio.value * (double) UINT32_MAX;
387 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 388 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
389 uint64_t val;
390
391 val = condition->threshold_bytes.value;
392 hash ^= hash_key_u64(&val, lttng_ht_seed);
393 }
394 return hash;
395}
396
e8360425
JD
397static
398unsigned long lttng_condition_session_consumed_size_hash(
9b63a4aa 399 const struct lttng_condition *_condition)
e8360425 400{
9a2746aa
JG
401 unsigned long hash;
402 unsigned long condition_type =
403 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
e8360425
JD
404 struct lttng_condition_session_consumed_size *condition;
405 uint64_t val;
406
407 condition = container_of(_condition,
408 struct lttng_condition_session_consumed_size, parent);
409
9a2746aa 410 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
e8360425
JD
411 if (condition->session_name) {
412 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
413 }
414 val = condition->consumed_threshold_bytes.value;
415 hash ^= hash_key_u64(&val, lttng_ht_seed);
416 return hash;
417}
418
a8393880
JG
419static
420unsigned long lttng_condition_session_rotation_hash(
421 const struct lttng_condition *_condition)
422{
423 unsigned long hash, condition_type;
424 struct lttng_condition_session_rotation *condition;
425
426 condition = container_of(_condition,
427 struct lttng_condition_session_rotation, parent);
428 condition_type = (unsigned long) condition->parent.type;
429 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
430 assert(condition->session_name);
431 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
432 return hash;
433}
9a2746aa 434
ab0ee2ca
JG
435/*
436 * The lttng_condition hashing code is kept in this file (rather than
437 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
438 * don't want to link in liblttng-ctl.
439 */
440static
9b63a4aa 441unsigned long lttng_condition_hash(const struct lttng_condition *condition)
ab0ee2ca
JG
442{
443 switch (condition->type) {
444 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
445 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
446 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
447 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
448 return lttng_condition_session_consumed_size_hash(condition);
a8393880
JG
449 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
450 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
451 return lttng_condition_session_rotation_hash(condition);
ab0ee2ca
JG
452 default:
453 ERR("[notification-thread] Unexpected condition type caught");
454 abort();
455 }
456}
457
e4db5ace
JR
458static
459unsigned long hash_channel_key(struct channel_key *key)
460{
461 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
462 unsigned long domain_hash = hash_key_ulong(
463 (void *) (unsigned long) key->domain, lttng_ht_seed);
464
465 return key_hash ^ domain_hash;
466}
467
ab0ee2ca
JG
468static
469void channel_info_destroy(struct channel_info *channel_info)
470{
471 if (!channel_info) {
472 return;
473 }
474
8abe313a
JG
475 if (channel_info->session_info) {
476 session_info_remove_channel(channel_info->session_info,
477 channel_info);
478 session_info_put(channel_info->session_info);
ab0ee2ca 479 }
8abe313a
JG
480 if (channel_info->name) {
481 free(channel_info->name);
ab0ee2ca
JG
482 }
483 free(channel_info);
484}
485
8abe313a 486/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 487static
8abe313a 488void session_info_destroy(void *_data)
ab0ee2ca 489{
8abe313a 490 struct session_info *session_info = _data;
3b68d0a3 491 int ret;
ab0ee2ca 492
8abe313a
JG
493 assert(session_info);
494 if (session_info->channel_infos_ht) {
3b68d0a3
JR
495 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
496 if (ret) {
4c3f302b 497 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 498 }
8abe313a 499 }
ea9a44f0 500 lttng_session_trigger_list_destroy(session_info->trigger_list);
1eee26c5
JG
501
502 rcu_read_lock();
503 cds_lfht_del(session_info->sessions_ht,
504 &session_info->sessions_ht_node);
505 rcu_read_unlock();
8abe313a
JG
506 free(session_info->name);
507 free(session_info);
508}
ab0ee2ca 509
8abe313a
JG
510static
511void session_info_get(struct session_info *session_info)
512{
513 if (!session_info) {
514 return;
515 }
516 lttng_ref_get(&session_info->ref);
517}
518
519static
520void session_info_put(struct session_info *session_info)
521{
522 if (!session_info) {
523 return;
ab0ee2ca 524 }
8abe313a
JG
525 lttng_ref_put(&session_info->ref);
526}
527
528static
ea9a44f0 529struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
1eee26c5
JG
530 struct lttng_session_trigger_list *trigger_list,
531 struct cds_lfht *sessions_ht)
8abe313a
JG
532{
533 struct session_info *session_info;
ab0ee2ca 534
8abe313a 535 assert(name);
ab0ee2ca 536
8abe313a
JG
537 session_info = zmalloc(sizeof(*session_info));
538 if (!session_info) {
539 goto end;
540 }
541 lttng_ref_init(&session_info->ref, session_info_destroy);
542
543 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
544 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
545 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
546 goto error;
547 }
8abe313a
JG
548
549 cds_lfht_node_init(&session_info->sessions_ht_node);
550 session_info->name = strdup(name);
551 if (!session_info->name) {
ab0ee2ca
JG
552 goto error;
553 }
8abe313a
JG
554 session_info->uid = uid;
555 session_info->gid = gid;
ea9a44f0 556 session_info->trigger_list = trigger_list;
1eee26c5 557 session_info->sessions_ht = sessions_ht;
8abe313a
JG
558end:
559 return session_info;
560error:
561 session_info_put(session_info);
562 return NULL;
563}
564
565static
566void session_info_add_channel(struct session_info *session_info,
567 struct channel_info *channel_info)
568{
569 rcu_read_lock();
570 cds_lfht_add(session_info->channel_infos_ht,
571 hash_channel_key(&channel_info->key),
572 &channel_info->session_info_channels_ht_node);
573 rcu_read_unlock();
574}
575
576static
577void session_info_remove_channel(struct session_info *session_info,
578 struct channel_info *channel_info)
579{
580 rcu_read_lock();
581 cds_lfht_del(session_info->channel_infos_ht,
582 &channel_info->session_info_channels_ht_node);
583 rcu_read_unlock();
584}
585
586static
587struct channel_info *channel_info_create(const char *channel_name,
588 struct channel_key *channel_key, uint64_t channel_capacity,
589 struct session_info *session_info)
590{
591 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
592
593 if (!channel_info) {
594 goto end;
595 }
596
ab0ee2ca 597 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
598 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
599 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
600 channel_info->capacity = channel_capacity;
601
602 channel_info->name = strdup(channel_name);
603 if (!channel_info->name) {
604 goto error;
605 }
606
607 /*
608 * Set the references between session and channel infos:
609 * - channel_info holds a strong reference to session_info
610 * - session_info holds a weak reference to channel_info
611 */
612 session_info_get(session_info);
613 session_info_add_channel(session_info, channel_info);
614 channel_info->session_info = session_info;
ab0ee2ca 615end:
8abe313a 616 return channel_info;
ab0ee2ca 617error:
8abe313a 618 channel_info_destroy(channel_info);
ab0ee2ca
JG
619 return NULL;
620}
621
ed327204
JG
622/* RCU read lock must be held by the caller. */
623static
624struct notification_client_list *get_client_list_from_condition(
625 struct notification_thread_state *state,
626 const struct lttng_condition *condition)
627{
628 struct cds_lfht_node *node;
629 struct cds_lfht_iter iter;
630
631 cds_lfht_lookup(state->notification_trigger_clients_ht,
632 lttng_condition_hash(condition),
633 match_client_list_condition,
634 condition,
635 &iter);
636 node = cds_lfht_iter_get_node(&iter);
637
638 return node ? caa_container_of(node,
639 struct notification_client_list,
640 notification_trigger_ht_node) : NULL;
641}
642
e4db5ace
JR
643/* This function must be called with the RCU read lock held. */
644static
645int evaluate_condition_for_client(struct lttng_trigger *trigger,
646 struct lttng_condition *condition,
647 struct notification_client *client,
648 struct notification_thread_state *state)
649{
650 int ret;
651 struct cds_lfht_iter iter;
652 struct cds_lfht_node *node;
653 struct channel_info *channel_info = NULL;
654 struct channel_key *channel_key = NULL;
655 struct channel_state_sample *last_sample = NULL;
656 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
657 struct lttng_evaluation *evaluation = NULL;
658 struct notification_client_list client_list = { 0 };
659 struct notification_client_list_element client_list_element = { 0 };
660
661 assert(trigger);
662 assert(condition);
663 assert(client);
664 assert(state);
665
666 /* Find the channel associated with the trigger. */
667 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
668 channel_trigger_list , channel_triggers_ht_node) {
669 struct lttng_trigger_list_element *element;
670
671 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa
JG
672 const struct lttng_condition *current_condition =
673 lttng_trigger_get_const_condition(
e4db5ace
JR
674 element->trigger);
675
676 assert(current_condition);
677 if (!lttng_condition_is_equal(condition,
2ae99f0b 678 current_condition)) {
e4db5ace
JR
679 continue;
680 }
681
682 /* Found the trigger, save the channel key. */
683 channel_key = &channel_trigger_list->channel_key;
684 break;
685 }
686 if (channel_key) {
687 /* The channel key was found stop iteration. */
688 break;
689 }
690 }
691
692 if (!channel_key){
693 /* No channel found; normal exit. */
694 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
695 ret = 0;
696 goto end;
697 }
698
699 /* Fetch channel info for the matching channel. */
700 cds_lfht_lookup(state->channels_ht,
701 hash_channel_key(channel_key),
702 match_channel_info,
703 channel_key,
704 &iter);
705 node = cds_lfht_iter_get_node(&iter);
706 assert(node);
707 channel_info = caa_container_of(node, struct channel_info,
708 channels_ht_node);
709
710 /* Retrieve the channel's last sample, if it exists. */
711 cds_lfht_lookup(state->channel_state_ht,
712 hash_channel_key(channel_key),
713 match_channel_state_sample,
714 channel_key,
715 &iter);
716 node = cds_lfht_iter_get_node(&iter);
717 if (node) {
718 last_sample = caa_container_of(node,
719 struct channel_state_sample,
720 channel_state_ht_node);
721 } else {
722 /* Nothing to evaluate, no sample was ever taken. Normal exit */
723 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
724 ret = 0;
725 goto end;
726 }
727
ed327204 728 ret = evaluate_buffer_condition(condition, &evaluation, state,
e8360425
JD
729 NULL, last_sample,
730 0, channel_info->session_info->consumed_data_size,
731 channel_info);
e4db5ace 732 if (ret) {
066a3c82 733 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
734 goto end;
735 }
736
737 if (!evaluation) {
738 /* Evaluation yielded nothing. Normal exit. */
739 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
740 ret = 0;
741 goto end;
742 }
743
744 /*
745 * Create a temporary client list with the client currently
746 * subscribing.
747 */
748 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
749 CDS_INIT_LIST_HEAD(&client_list.list);
750 client_list.trigger = trigger;
751
752 CDS_INIT_LIST_HEAD(&client_list_element.node);
753 client_list_element.client = client;
754 cds_list_add(&client_list_element.node, &client_list.list);
755
756 /* Send evaluation result to the newly-subscribed client. */
757 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
758 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
8abe313a
JG
759 state, channel_info->session_info->uid,
760 channel_info->session_info->gid);
e4db5ace
JR
761
762end:
763 return ret;
764}
765
ab0ee2ca
JG
766static
767int notification_thread_client_subscribe(struct notification_client *client,
768 struct lttng_condition *condition,
769 struct notification_thread_state *state,
770 enum lttng_notification_channel_status *_status)
771{
772 int ret = 0;
ab0ee2ca
JG
773 struct notification_client_list *client_list;
774 struct lttng_condition_list_element *condition_list_element = NULL;
775 struct notification_client_list_element *client_list_element = NULL;
776 enum lttng_notification_channel_status status =
777 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
778
779 /*
780 * Ensure that the client has not already subscribed to this condition
781 * before.
782 */
783 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
784 if (lttng_condition_is_equal(condition_list_element->condition,
785 condition)) {
786 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
787 goto end;
788 }
789 }
790
791 condition_list_element = zmalloc(sizeof(*condition_list_element));
792 if (!condition_list_element) {
793 ret = -1;
794 goto error;
795 }
796 client_list_element = zmalloc(sizeof(*client_list_element));
797 if (!client_list_element) {
798 ret = -1;
799 goto error;
800 }
801
802 rcu_read_lock();
803
804 /*
805 * Add the newly-subscribed condition to the client's subscription list.
806 */
807 CDS_INIT_LIST_HEAD(&condition_list_element->node);
808 condition_list_element->condition = condition;
809 cds_list_add(&condition_list_element->node, &client->condition_list);
810
ed327204
JG
811 client_list = get_client_list_from_condition(state, condition);
812 if (!client_list) {
2ae99f0b
JG
813 /*
814 * No notification-emiting trigger registered with this
815 * condition. We don't evaluate the condition right away
816 * since this trigger is not registered yet.
817 */
4fb43b68 818 free(client_list_element);
ab0ee2ca
JG
819 goto end_unlock;
820 }
821
2ae99f0b
JG
822 /*
823 * The condition to which the client just subscribed is evaluated
824 * at this point so that conditions that are already TRUE result
825 * in a notification being sent out.
826 */
e4db5ace
JR
827 if (evaluate_condition_for_client(client_list->trigger, condition,
828 client, state)) {
829 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
830 ret = -1;
4bd5b4af 831 free(client_list_element);
e4db5ace
JR
832 goto end_unlock;
833 }
834
835 /*
836 * Add the client to the list of clients interested in a given trigger
837 * if a "notification" trigger with a corresponding condition was
838 * added prior.
839 */
ab0ee2ca
JG
840 client_list_element->client = client;
841 CDS_INIT_LIST_HEAD(&client_list_element->node);
842 cds_list_add(&client_list_element->node, &client_list->list);
843end_unlock:
844 rcu_read_unlock();
845end:
846 if (_status) {
847 *_status = status;
848 }
849 return ret;
850error:
851 free(condition_list_element);
852 free(client_list_element);
853 return ret;
854}
855
856static
857int notification_thread_client_unsubscribe(
858 struct notification_client *client,
859 struct lttng_condition *condition,
860 struct notification_thread_state *state,
861 enum lttng_notification_channel_status *_status)
862{
ab0ee2ca
JG
863 struct notification_client_list *client_list;
864 struct lttng_condition_list_element *condition_list_element,
865 *condition_tmp;
866 struct notification_client_list_element *client_list_element,
867 *client_tmp;
868 bool condition_found = false;
869 enum lttng_notification_channel_status status =
870 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
871
872 /* Remove the condition from the client's condition list. */
873 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
874 &client->condition_list, node) {
875 if (!lttng_condition_is_equal(condition_list_element->condition,
876 condition)) {
877 continue;
878 }
879
880 cds_list_del(&condition_list_element->node);
881 /*
882 * The caller may be iterating on the client's conditions to
883 * tear down a client's connection. In this case, the condition
884 * will be destroyed at the end.
885 */
886 if (condition != condition_list_element->condition) {
887 lttng_condition_destroy(
888 condition_list_element->condition);
889 }
890 free(condition_list_element);
891 condition_found = true;
892 break;
893 }
894
895 if (!condition_found) {
896 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
897 goto end;
898 }
899
900 /*
901 * Remove the client from the list of clients interested the trigger
902 * matching the condition.
903 */
904 rcu_read_lock();
ed327204
JG
905 client_list = get_client_list_from_condition(state, condition);
906 if (!client_list) {
ab0ee2ca
JG
907 goto end_unlock;
908 }
909
ab0ee2ca
JG
910 cds_list_for_each_entry_safe(client_list_element, client_tmp,
911 &client_list->list, node) {
912 if (client_list_element->client->socket != client->socket) {
913 continue;
914 }
915 cds_list_del(&client_list_element->node);
916 free(client_list_element);
917 break;
918 }
919end_unlock:
920 rcu_read_unlock();
921end:
922 lttng_condition_destroy(condition);
923 if (_status) {
924 *_status = status;
925 }
926 return 0;
927}
928
929static
930void notification_client_destroy(struct notification_client *client,
931 struct notification_thread_state *state)
932{
933 struct lttng_condition_list_element *condition_list_element, *tmp;
934
935 if (!client) {
936 return;
937 }
938
939 /* Release all conditions to which the client was subscribed. */
940 cds_list_for_each_entry_safe(condition_list_element, tmp,
941 &client->condition_list, node) {
942 (void) notification_thread_client_unsubscribe(client,
943 condition_list_element->condition, state, NULL);
944 }
945
946 if (client->socket >= 0) {
947 (void) lttcomm_close_unix_sock(client->socket);
948 }
949 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
950 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
951 free(client);
952}
953
954/*
955 * Call with rcu_read_lock held (and hold for the lifetime of the returned
956 * client pointer).
957 */
958static
959struct notification_client *get_client_from_socket(int socket,
960 struct notification_thread_state *state)
961{
962 struct cds_lfht_iter iter;
963 struct cds_lfht_node *node;
964 struct notification_client *client = NULL;
965
966 cds_lfht_lookup(state->client_socket_ht,
967 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
968 match_client,
969 (void *) (unsigned long) socket,
970 &iter);
971 node = cds_lfht_iter_get_node(&iter);
972 if (!node) {
973 goto end;
974 }
975
976 client = caa_container_of(node, struct notification_client,
977 client_socket_ht_node);
978end:
979 return client;
980}
981
982static
e8360425 983bool buffer_usage_condition_applies_to_channel(
51eab943
JG
984 const struct lttng_condition *condition,
985 const struct channel_info *channel_info)
ab0ee2ca
JG
986{
987 enum lttng_condition_status status;
e8360425
JD
988 enum lttng_domain_type condition_domain;
989 const char *condition_session_name = NULL;
990 const char *condition_channel_name = NULL;
ab0ee2ca 991
e8360425
JD
992 status = lttng_condition_buffer_usage_get_domain_type(condition,
993 &condition_domain);
994 assert(status == LTTNG_CONDITION_STATUS_OK);
995 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
996 goto fail;
997 }
998
e8360425
JD
999 status = lttng_condition_buffer_usage_get_session_name(
1000 condition, &condition_session_name);
1001 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1002
1003 status = lttng_condition_buffer_usage_get_channel_name(
1004 condition, &condition_channel_name);
1005 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1006
1007 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1008 goto fail;
1009 }
1010 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
1011 goto fail;
1012 }
1013
e8360425
JD
1014 return true;
1015fail:
1016 return false;
1017}
1018
1019static
1020bool session_consumed_size_condition_applies_to_channel(
51eab943
JG
1021 const struct lttng_condition *condition,
1022 const struct channel_info *channel_info)
e8360425
JD
1023{
1024 enum lttng_condition_status status;
1025 const char *condition_session_name = NULL;
1026
1027 status = lttng_condition_session_consumed_size_get_session_name(
1028 condition, &condition_session_name);
1029 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1030
1031 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1032 goto fail;
1033 }
1034
e8360425
JD
1035 return true;
1036fail:
1037 return false;
1038}
ab0ee2ca 1039
51eab943
JG
1040/*
1041 * Get the type of object to which a given trigger applies. Binding lets
1042 * the notification system evaluate a trigger's condition when a given
1043 * object's state is updated.
1044 *
1045 * For instance, a condition bound to a channel will be evaluated everytime
1046 * the channel's state is changed by a channel monitoring sample.
1047 */
e8360425 1048static
51eab943
JG
1049enum lttng_object_type get_trigger_binding_object(
1050 const struct lttng_trigger *trigger)
e8360425 1051{
51eab943
JG
1052 const struct lttng_condition *condition;
1053
1054 condition = lttng_trigger_get_const_condition(trigger);
1055 switch (lttng_condition_get_type(condition)) {
1056 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1057 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1058 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1059 return LTTNG_OBJECT_TYPE_CHANNEL;
1060 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1061 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1062 return LTTNG_OBJECT_TYPE_SESSION;
1063 default:
1064 return LTTNG_OBJECT_TYPE_UNKNOWN;
1065 }
1066}
1067
1068static
1069bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1070 const struct channel_info *channel_info)
1071{
1072 const struct lttng_condition *condition;
e8360425 1073 bool trigger_applies;
ab0ee2ca 1074
51eab943 1075 condition = lttng_trigger_get_const_condition(trigger);
e8360425 1076 if (!condition) {
ab0ee2ca
JG
1077 goto fail;
1078 }
e8360425
JD
1079
1080 switch (lttng_condition_get_type(condition)) {
1081 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1082 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1083 trigger_applies = buffer_usage_condition_applies_to_channel(
1084 condition, channel_info);
1085 break;
1086 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1087 trigger_applies = session_consumed_size_condition_applies_to_channel(
1088 condition, channel_info);
1089 break;
1090 default:
ab0ee2ca
JG
1091 goto fail;
1092 }
1093
e8360425 1094 return trigger_applies;
ab0ee2ca
JG
1095fail:
1096 return false;
1097}
1098
1099static
1100bool trigger_applies_to_client(struct lttng_trigger *trigger,
1101 struct notification_client *client)
1102{
1103 bool applies = false;
1104 struct lttng_condition_list_element *condition_list_element;
1105
1106 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1107 node) {
1108 applies = lttng_condition_is_equal(
1109 condition_list_element->condition,
1110 lttng_trigger_get_condition(trigger));
1111 if (applies) {
1112 break;
1113 }
1114 }
1115 return applies;
1116}
1117
8abe313a
JG
1118static
1119int match_session(struct cds_lfht_node *node, const void *key)
1120{
1121 const char *name = key;
1122 struct session_info *session_info = caa_container_of(
1123 node, struct session_info, sessions_ht_node);
1124
1125 return !strcmp(session_info->name, name);
1126}
1127
ed327204
JG
1128/* Must be called with RCU read lock held. */
1129static
1130struct lttng_session_trigger_list *get_session_trigger_list(
1131 struct notification_thread_state *state,
1132 const char *session_name)
1133{
1134 struct lttng_session_trigger_list *list = NULL;
1135 struct cds_lfht_node *node;
1136 struct cds_lfht_iter iter;
1137
1138 cds_lfht_lookup(state->session_triggers_ht,
1139 hash_key_str(session_name, lttng_ht_seed),
1140 match_session_trigger_list,
1141 session_name,
1142 &iter);
1143 node = cds_lfht_iter_get_node(&iter);
1144 if (!node) {
1145 /*
1146 * Not an error, the list of triggers applying to that session
1147 * will be initialized when the session is created.
1148 */
1149 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1150 session_name);
1151 goto end;
1152 }
1153
1154 list = caa_container_of(node,
1155 struct lttng_session_trigger_list,
1156 session_triggers_ht_node);
1157end:
1158 return list;
1159}
1160
ea9a44f0
JG
1161/*
1162 * Allocate an empty lttng_session_trigger_list for the session named
1163 * 'session_name'.
1164 *
1165 * No ownership of 'session_name' is assumed by the session trigger list.
1166 * It is the caller's responsability to ensure the session name is alive
1167 * for as long as this list is.
1168 */
1169static
1170struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1171 const char *session_name,
1172 struct cds_lfht *session_triggers_ht)
1173{
1174 struct lttng_session_trigger_list *list;
1175
1176 list = zmalloc(sizeof(*list));
1177 if (!list) {
1178 goto end;
1179 }
1180 list->session_name = session_name;
1181 CDS_INIT_LIST_HEAD(&list->list);
1182 cds_lfht_node_init(&list->session_triggers_ht_node);
1183 list->session_triggers_ht = session_triggers_ht;
1184
1185 rcu_read_lock();
1186 /* Publish the list through the session_triggers_ht. */
1187 cds_lfht_add(session_triggers_ht,
1188 hash_key_str(session_name, lttng_ht_seed),
1189 &list->session_triggers_ht_node);
1190 rcu_read_unlock();
1191end:
1192 return list;
1193}
1194
1195static
1196void free_session_trigger_list_rcu(struct rcu_head *node)
1197{
1198 free(caa_container_of(node, struct lttng_session_trigger_list,
1199 rcu_node));
1200}
1201
1202static
1203void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1204{
1205 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1206
1207 /* Empty the list element by element, and then free the list itself. */
1208 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1209 &list->list, node) {
1210 cds_list_del(&trigger_list_element->node);
1211 free(trigger_list_element);
1212 }
1213 rcu_read_lock();
1214 /* Unpublish the list from the session_triggers_ht. */
1215 cds_lfht_del(list->session_triggers_ht,
1216 &list->session_triggers_ht_node);
1217 rcu_read_unlock();
1218 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1219}
1220
1221static
1222int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1223 const struct lttng_trigger *trigger)
1224{
1225 int ret = 0;
1226 struct lttng_trigger_list_element *new_element =
1227 zmalloc(sizeof(*new_element));
1228
1229 if (!new_element) {
1230 ret = -1;
1231 goto end;
1232 }
1233 CDS_INIT_LIST_HEAD(&new_element->node);
1234 new_element->trigger = trigger;
1235 cds_list_add(&new_element->node, &list->list);
1236end:
1237 return ret;
1238}
1239
1240static
1241bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1242 const char *session_name)
1243{
1244 bool applies = false;
1245 const struct lttng_condition *condition;
1246
1247 condition = lttng_trigger_get_const_condition(trigger);
1248 switch (lttng_condition_get_type(condition)) {
1249 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1250 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1251 {
1252 enum lttng_condition_status condition_status;
1253 const char *condition_session_name;
1254
1255 condition_status = lttng_condition_session_rotation_get_session_name(
1256 condition, &condition_session_name);
1257 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1258 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1259 goto end;
1260 }
1261
1262 assert(condition_session_name);
1263 applies = !strcmp(condition_session_name, session_name);
1264 break;
1265 }
1266 default:
1267 goto end;
1268 }
1269end:
1270 return applies;
1271}
1272
1273/*
1274 * Allocate and initialize an lttng_session_trigger_list which contains
1275 * all triggers that apply to the session named 'session_name'.
1276 *
1277 * No ownership of 'session_name' is assumed by the session trigger list.
1278 * It is the caller's responsability to ensure the session name is alive
1279 * for as long as this list is.
1280 */
1281static
1282struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1283 const struct notification_thread_state *state,
1284 const char *session_name)
1285{
1286 int trigger_count = 0;
1287 struct lttng_session_trigger_list *session_trigger_list = NULL;
1288 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1289 struct cds_lfht_iter iter;
1290
1291 session_trigger_list = lttng_session_trigger_list_create(session_name,
1292 state->session_triggers_ht);
1293
1294 /* Add all triggers applying to the session named 'session_name'. */
1295 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1296 node) {
1297 int ret;
1298
1299 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1300 session_name)) {
1301 continue;
1302 }
1303
1304 ret = lttng_session_trigger_list_add(session_trigger_list,
1305 trigger_ht_element->trigger);
1306 if (ret) {
1307 goto error;
1308 }
1309
1310 trigger_count++;
1311 }
1312
1313 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1314 trigger_count);
1315 return session_trigger_list;
1316error:
1317 lttng_session_trigger_list_destroy(session_trigger_list);
1318 return NULL;
1319}
1320
8abe313a
JG
1321static
1322struct session_info *find_or_create_session_info(
1323 struct notification_thread_state *state,
1324 const char *name, uid_t uid, gid_t gid)
1325{
1326 struct session_info *session = NULL;
1327 struct cds_lfht_node *node;
1328 struct cds_lfht_iter iter;
ea9a44f0 1329 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1330
1331 rcu_read_lock();
1332 cds_lfht_lookup(state->sessions_ht,
1333 hash_key_str(name, lttng_ht_seed),
1334 match_session,
1335 name,
1336 &iter);
1337 node = cds_lfht_iter_get_node(&iter);
1338 if (node) {
1339 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1340 name, uid, gid);
1341 session = caa_container_of(node, struct session_info,
1342 sessions_ht_node);
1343 assert(session->uid == uid);
1344 assert(session->gid == gid);
1eee26c5
JG
1345 session_info_get(session);
1346 goto end;
ea9a44f0
JG
1347 }
1348
1349 trigger_list = lttng_session_trigger_list_build(state, name);
1350 if (!trigger_list) {
1351 goto error;
8abe313a
JG
1352 }
1353
1eee26c5
JG
1354 session = session_info_create(name, uid, gid, trigger_list,
1355 state->sessions_ht);
8abe313a
JG
1356 if (!session) {
1357 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1358 name, uid, gid);
ea9a44f0 1359 goto error;
8abe313a 1360 }
ea9a44f0 1361 trigger_list = NULL;
577983cb
JG
1362
1363 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1364 &session->sessions_ht_node);
1eee26c5 1365end:
8abe313a
JG
1366 rcu_read_unlock();
1367 return session;
ea9a44f0
JG
1368error:
1369 rcu_read_unlock();
1370 session_info_put(session);
1371 return NULL;
8abe313a
JG
1372}
1373
ab0ee2ca
JG
1374static
1375int handle_notification_thread_command_add_channel(
8abe313a
JG
1376 struct notification_thread_state *state,
1377 const char *session_name, uid_t session_uid, gid_t session_gid,
1378 const char *channel_name, enum lttng_domain_type channel_domain,
1379 uint64_t channel_key_int, uint64_t channel_capacity,
1380 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1381{
1382 struct cds_list_head trigger_list;
8abe313a
JG
1383 struct channel_info *new_channel_info = NULL;
1384 struct channel_key channel_key = {
1385 .key = channel_key_int,
1386 .domain = channel_domain,
1387 };
ab0ee2ca
JG
1388 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1389 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1390 int trigger_count = 0;
1391 struct cds_lfht_iter iter;
8abe313a 1392 struct session_info *session_info = NULL;
ab0ee2ca
JG
1393
1394 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1395 channel_name, session_name, channel_key_int,
1396 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1397
1398 CDS_INIT_LIST_HEAD(&trigger_list);
1399
8abe313a
JG
1400 session_info = find_or_create_session_info(state, session_name,
1401 session_uid, session_gid);
1402 if (!session_info) {
1403 /* Allocation error or an internal error occured. */
ab0ee2ca
JG
1404 goto error;
1405 }
1406
8abe313a
JG
1407 new_channel_info = channel_info_create(channel_name, &channel_key,
1408 channel_capacity, session_info);
1409 if (!new_channel_info) {
1410 goto error;
1411 }
ab0ee2ca
JG
1412
1413 /* Build a list of all triggers applying to the new channel. */
1414 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1415 node) {
1416 struct lttng_trigger_list_element *new_element;
1417
1418 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1419 new_channel_info)) {
ab0ee2ca
JG
1420 continue;
1421 }
1422
1423 new_element = zmalloc(sizeof(*new_element));
1424 if (!new_element) {
1425 goto error;
1426 }
1427 CDS_INIT_LIST_HEAD(&new_element->node);
1428 new_element->trigger = trigger_ht_element->trigger;
1429 cds_list_add(&new_element->node, &trigger_list);
1430 trigger_count++;
1431 }
1432
1433 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1434 trigger_count);
1435 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1436 if (!channel_trigger_list) {
1437 goto error;
1438 }
8abe313a 1439 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1440 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1441 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1442 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1443
1444 rcu_read_lock();
1445 /* Add channel to the channel_ht which owns the channel_infos. */
1446 cds_lfht_add(state->channels_ht,
8abe313a 1447 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1448 &new_channel_info->channels_ht_node);
1449 /*
1450 * Add the list of triggers associated with this channel to the
1451 * channel_triggers_ht.
1452 */
1453 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1454 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1455 &channel_trigger_list->channel_triggers_ht_node);
1456 rcu_read_unlock();
1eee26c5 1457 session_info_put(session_info);
ab0ee2ca
JG
1458 *cmd_result = LTTNG_OK;
1459 return 0;
1460error:
ab0ee2ca 1461 channel_info_destroy(new_channel_info);
8abe313a 1462 session_info_put(session_info);
ab0ee2ca
JG
1463 return 1;
1464}
1465
1466static
1467int handle_notification_thread_command_remove_channel(
1468 struct notification_thread_state *state,
1469 uint64_t channel_key, enum lttng_domain_type domain,
1470 enum lttng_error_code *cmd_result)
1471{
1472 struct cds_lfht_node *node;
1473 struct cds_lfht_iter iter;
1474 struct lttng_channel_trigger_list *trigger_list;
1475 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1476 struct channel_key key = { .key = channel_key, .domain = domain };
1477 struct channel_info *channel_info;
1478
1479 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1480 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1481
1482 rcu_read_lock();
1483
1484 cds_lfht_lookup(state->channel_triggers_ht,
1485 hash_channel_key(&key),
1486 match_channel_trigger_list,
1487 &key,
1488 &iter);
1489 node = cds_lfht_iter_get_node(&iter);
1490 /*
1491 * There is a severe internal error if we are being asked to remove a
1492 * channel that doesn't exist.
1493 */
1494 if (!node) {
1495 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1496 goto end;
1497 }
1498
1499 /* Free the list of triggers associated with this channel. */
1500 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1501 channel_triggers_ht_node);
1502 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1503 &trigger_list->list, node) {
1504 cds_list_del(&trigger_list_element->node);
1505 free(trigger_list_element);
1506 }
1507 cds_lfht_del(state->channel_triggers_ht, node);
1508 free(trigger_list);
1509
1510 /* Free sampled channel state. */
1511 cds_lfht_lookup(state->channel_state_ht,
1512 hash_channel_key(&key),
1513 match_channel_state_sample,
1514 &key,
1515 &iter);
1516 node = cds_lfht_iter_get_node(&iter);
1517 /*
1518 * This is expected to be NULL if the channel is destroyed before we
1519 * received a sample.
1520 */
1521 if (node) {
1522 struct channel_state_sample *sample = caa_container_of(node,
1523 struct channel_state_sample,
1524 channel_state_ht_node);
1525
1526 cds_lfht_del(state->channel_state_ht, node);
1527 free(sample);
1528 }
1529
1530 /* Remove the channel from the channels_ht and free it. */
1531 cds_lfht_lookup(state->channels_ht,
1532 hash_channel_key(&key),
1533 match_channel_info,
1534 &key,
1535 &iter);
1536 node = cds_lfht_iter_get_node(&iter);
1537 assert(node);
1538 channel_info = caa_container_of(node, struct channel_info,
1539 channels_ht_node);
1540 cds_lfht_del(state->channels_ht, node);
1541 channel_info_destroy(channel_info);
1542end:
1543 rcu_read_unlock();
1544 *cmd_result = LTTNG_OK;
1545 return 0;
1546}
1547
0ca52944 1548static
ed327204 1549int handle_notification_thread_command_session_rotation(
0ca52944 1550 struct notification_thread_state *state,
ed327204
JG
1551 enum notification_thread_command_type cmd_type,
1552 const char *session_name, uid_t session_uid, gid_t session_gid,
1553 uint64_t trace_archive_chunk_id,
1554 struct lttng_trace_archive_location *location,
1555 enum lttng_error_code *_cmd_result)
0ca52944 1556{
ed327204
JG
1557 int ret = 0;
1558 enum lttng_error_code cmd_result = LTTNG_OK;
1559 struct lttng_session_trigger_list *trigger_list;
1560 struct lttng_trigger_list_element *trigger_list_element;
1561 struct session_info *session_info;
0ca52944 1562
ed327204
JG
1563 rcu_read_lock();
1564
1565 session_info = find_or_create_session_info(state, session_name,
1566 session_uid, session_gid);
1567 if (!session_info) {
1568 /* Allocation error or an internal error occured. */
1569 ret = -1;
1570 cmd_result = LTTNG_ERR_NOMEM;
1571 goto end;
1572 }
1573
1574 session_info->rotation.ongoing =
1575 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1576 session_info->rotation.id = trace_archive_chunk_id;
1577 trigger_list = get_session_trigger_list(state, session_name);
1578 if (!trigger_list) {
1579 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1580 session_name);
1581 goto end;
1582 }
1583
1584 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1585 node) {
1586 const struct lttng_condition *condition;
1587 const struct lttng_action *action;
1588 const struct lttng_trigger *trigger;
1589 struct notification_client_list *client_list;
1590 struct lttng_evaluation *evaluation = NULL;
1591 enum lttng_condition_type condition_type;
1592
1593 trigger = trigger_list_element->trigger;
1594 condition = lttng_trigger_get_const_condition(trigger);
1595 assert(condition);
1596 condition_type = lttng_condition_get_type(condition);
1597
1598 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1599 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1600 continue;
1601 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1602 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1603 continue;
1604 }
1605
1606 action = lttng_trigger_get_const_action(trigger);
1607
1608 /* Notify actions are the only type currently supported. */
1609 assert(lttng_action_get_type_const(action) ==
1610 LTTNG_ACTION_TYPE_NOTIFY);
1611
1612 client_list = get_client_list_from_condition(state, condition);
1613 assert(client_list);
1614
1615 if (cds_list_empty(&client_list->list)) {
1616 /*
1617 * No clients interested in the evaluation's result,
1618 * skip it.
1619 */
1620 continue;
1621 }
1622
1623 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1624 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1625 trace_archive_chunk_id);
1626 } else {
1627 evaluation = lttng_evaluation_session_rotation_completed_create(
1628 trace_archive_chunk_id, location);
1629 }
1630
1631 if (!evaluation) {
1632 /* Internal error */
1633 ret = -1;
1634 cmd_result = LTTNG_ERR_UNK;
1635 goto end;
1636 }
1637
1638 /* Dispatch evaluation result to all clients. */
1639 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1640 evaluation, client_list, state,
1641 session_info->uid,
1642 session_info->gid);
1643 lttng_evaluation_destroy(evaluation);
1644 if (caa_unlikely(ret)) {
1645 goto end;
1646 }
1647 }
1648end:
1649 session_info_put(session_info);
1650 *_cmd_result = cmd_result;
1651 rcu_read_unlock();
1652 return ret;
0ca52944
JG
1653}
1654
1da26331
JG
1655static
1656int condition_is_supported(struct lttng_condition *condition)
1657{
1658 int ret;
1659
1660 switch (lttng_condition_get_type(condition)) {
1661 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1662 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1663 {
1664 enum lttng_domain_type domain;
1665
1666 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1667 &domain);
1668 if (ret) {
1669 ret = -1;
1670 goto end;
1671 }
1672
1673 if (domain != LTTNG_DOMAIN_KERNEL) {
1674 ret = 1;
1675 goto end;
1676 }
1677
1678 /*
1679 * Older kernel tracers don't expose the API to monitor their
1680 * buffers. Therefore, we reject triggers that require that
1681 * mechanism to be available to be evaluated.
1682 */
1683 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1684 kernel_tracer_fd);
1685 break;
1686 }
1687 default:
1688 ret = 1;
1689 }
1690end:
1691 return ret;
1692}
1693
51eab943
JG
1694/* Must be called with RCU read lock held. */
1695static
1696int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
1697 struct notification_thread_state *state)
1698{
1699 int ret = 0;
51eab943
JG
1700 const struct lttng_condition *condition;
1701 const char *session_name;
1702 struct lttng_session_trigger_list *trigger_list;
1703
1704 condition = lttng_trigger_get_const_condition(trigger);
1705 switch (lttng_condition_get_type(condition)) {
1706 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1707 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1708 {
1709 enum lttng_condition_status status;
1710
1711 status = lttng_condition_session_rotation_get_session_name(
1712 condition, &session_name);
1713 if (status != LTTNG_CONDITION_STATUS_OK) {
1714 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1715 ret = -1;
1716 goto end;
1717 }
1718 break;
1719 }
1720 default:
1721 ret = -1;
1722 goto end;
1723 }
1724
ed327204
JG
1725 trigger_list = get_session_trigger_list(state, session_name);
1726 if (!trigger_list) {
51eab943
JG
1727 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1728 session_name);
1729 goto end;
51eab943 1730
ed327204 1731 }
51eab943
JG
1732
1733 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1734 session_name);
1735 ret = lttng_session_trigger_list_add(trigger_list, trigger);
1736end:
1737 return ret;
1738}
1739
1740/* Must be called with RCU read lock held. */
1741static
1742int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
1743 struct notification_thread_state *state)
1744{
1745 int ret = 0;
1746 struct cds_lfht_node *node;
1747 struct cds_lfht_iter iter;
1748 struct channel_info *channel;
1749
1750 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1751 channels_ht_node) {
1752 struct lttng_trigger_list_element *trigger_list_element;
1753 struct lttng_channel_trigger_list *trigger_list;
1754
1755 if (!trigger_applies_to_channel(trigger, channel)) {
1756 continue;
1757 }
1758
1759 cds_lfht_lookup(state->channel_triggers_ht,
1760 hash_channel_key(&channel->key),
1761 match_channel_trigger_list,
1762 &channel->key,
1763 &iter);
1764 node = cds_lfht_iter_get_node(&iter);
1765 assert(node);
1766 trigger_list = caa_container_of(node,
1767 struct lttng_channel_trigger_list,
1768 channel_triggers_ht_node);
1769
1770 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1771 if (!trigger_list_element) {
1772 ret = -1;
1773 goto end;
1774 }
1775 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1776 trigger_list_element->trigger = trigger;
1777 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1778 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
1779 channel->name);
1780 }
1781end:
1782 return ret;
1783}
1784
ab0ee2ca
JG
1785/*
1786 * FIXME A client's credentials are not checked when registering a trigger, nor
1787 * are they stored alongside with the trigger.
1788 *
1da26331 1789 * The effects of this are benign since:
ab0ee2ca 1790 * - The client will succeed in registering the trigger, as it is valid,
51eab943 1791 * - The trigger will, internally, be bound to the channel/session,
ab0ee2ca
JG
1792 * - The notifications will not be sent since the client's credentials
1793 * are checked against the channel at that moment.
1da26331
JG
1794 *
1795 * If this function returns a non-zero value, it means something is
50ca7858 1796 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
1797 *
1798 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1799 * error code.
ab0ee2ca
JG
1800 */
1801static
1802int handle_notification_thread_command_register_trigger(
8abe313a
JG
1803 struct notification_thread_state *state,
1804 struct lttng_trigger *trigger,
1805 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1806{
1807 int ret = 0;
1808 struct lttng_condition *condition;
1809 struct notification_client *client;
1810 struct notification_client_list *client_list = NULL;
1811 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1812 struct notification_client_list_element *client_list_element, *tmp;
1813 struct cds_lfht_node *node;
1814 struct cds_lfht_iter iter;
ab0ee2ca
JG
1815 bool free_trigger = true;
1816
1817 rcu_read_lock();
1818
1819 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
1820 assert(condition);
1821
1822 ret = condition_is_supported(condition);
1823 if (ret < 0) {
1824 goto error;
1825 } else if (ret == 0) {
1826 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1827 goto error;
1828 } else {
1829 /* Feature is supported, continue. */
1830 ret = 0;
1831 }
1832
ab0ee2ca
JG
1833 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1834 if (!trigger_ht_element) {
1835 ret = -1;
1836 goto error;
1837 }
1838
1839 /* Add trigger to the trigger_ht. */
1840 cds_lfht_node_init(&trigger_ht_element->node);
1841 trigger_ht_element->trigger = trigger;
1842
1843 node = cds_lfht_add_unique(state->triggers_ht,
1844 lttng_condition_hash(condition),
1845 match_condition,
1846 condition,
1847 &trigger_ht_element->node);
1848 if (node != &trigger_ht_element->node) {
1849 /* Not a fatal error, simply report it to the client. */
1850 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1851 goto error_free_ht_element;
1852 }
1853
1854 /*
1855 * Ownership of the trigger and of its wrapper was transfered to
1856 * the triggers_ht.
1857 */
1858 trigger_ht_element = NULL;
1859 free_trigger = false;
1860
1861 /*
1862 * The rest only applies to triggers that have a "notify" action.
1863 * It is not skipped as this is the only action type currently
1864 * supported.
1865 */
1866 client_list = zmalloc(sizeof(*client_list));
1867 if (!client_list) {
1868 ret = -1;
1869 goto error_free_ht_element;
1870 }
1871 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1872 CDS_INIT_LIST_HEAD(&client_list->list);
1873 client_list->trigger = trigger;
1874
1875 /* Build a list of clients to which this new trigger applies. */
1876 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1877 client_socket_ht_node) {
1878 if (!trigger_applies_to_client(trigger, client)) {
1879 continue;
1880 }
1881
1882 client_list_element = zmalloc(sizeof(*client_list_element));
1883 if (!client_list_element) {
1884 ret = -1;
1885 goto error_free_client_list;
1886 }
1887 CDS_INIT_LIST_HEAD(&client_list_element->node);
1888 client_list_element->client = client;
1889 cds_list_add(&client_list_element->node, &client_list->list);
1890 }
1891
1892 cds_lfht_add(state->notification_trigger_clients_ht,
1893 lttng_condition_hash(condition),
1894 &client_list->notification_trigger_ht_node);
ab0ee2ca 1895
51eab943
JG
1896 switch (get_trigger_binding_object(trigger)) {
1897 case LTTNG_OBJECT_TYPE_SESSION:
1898 /* Add the trigger to the list if it matches a known session. */
1899 ret = bind_trigger_to_matching_session(trigger, state);
1900 if (ret) {
1901 goto error_free_client_list;
ab0ee2ca 1902 }
51eab943
JG
1903 case LTTNG_OBJECT_TYPE_CHANNEL:
1904 /*
1905 * Add the trigger to list of triggers bound to the channels
1906 * currently known.
1907 */
1908 ret = bind_trigger_to_matching_channels(trigger, state);
1909 if (ret) {
ab0ee2ca
JG
1910 goto error_free_client_list;
1911 }
51eab943
JG
1912 break;
1913 case LTTNG_OBJECT_TYPE_NONE:
1914 break;
1915 default:
1916 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
1917 ret = -1;
1918 goto error_free_client_list;
ab0ee2ca
JG
1919 }
1920
2ae99f0b
JG
1921 /*
1922 * Since there is nothing preventing clients from subscribing to a
1923 * condition before the corresponding trigger is registered, we have
1924 * to evaluate this new condition right away.
1925 *
1926 * At some point, we were waiting for the next "evaluation" (e.g. on
1927 * reception of a channel sample) to evaluate this new condition, but
1928 * that was broken.
1929 *
1930 * The reason it was broken is that waiting for the next sample
1931 * does not allow us to properly handle transitions for edge-triggered
1932 * conditions.
1933 *
1934 * Consider this example: when we handle a new channel sample, we
1935 * evaluate each conditions twice: once with the previous state, and
1936 * again with the newest state. We then use those two results to
1937 * determine whether a state change happened: a condition was false and
1938 * became true. If a state change happened, we have to notify clients.
1939 *
1940 * Now, if a client subscribes to a given notification and registers
1941 * a trigger *after* that subscription, we have to make sure the
1942 * condition is evaluated at this point while considering only the
1943 * current state. Otherwise, the next evaluation cycle may only see
1944 * that the evaluations remain the same (true for samples n-1 and n) and
1945 * the client will never know that the condition has been met.
1946 */
1947 cds_list_for_each_entry_safe(client_list_element, tmp,
1948 &client_list->list, node) {
1949 ret = evaluate_condition_for_client(trigger, condition,
1950 client_list_element->client, state);
1951 if (ret) {
1952 goto error_free_client_list;
1953 }
1954 }
1955
1956 /*
1957 * Client list ownership transferred to the
1958 * notification_trigger_clients_ht.
1959 */
1960 client_list = NULL;
1961
ab0ee2ca
JG
1962 *cmd_result = LTTNG_OK;
1963error_free_client_list:
1964 if (client_list) {
1965 cds_list_for_each_entry_safe(client_list_element, tmp,
1966 &client_list->list, node) {
1967 free(client_list_element);
1968 }
1969 free(client_list);
1970 }
1971error_free_ht_element:
1972 free(trigger_ht_element);
1973error:
1974 if (free_trigger) {
1975 struct lttng_action *action = lttng_trigger_get_action(trigger);
1976
1977 lttng_condition_destroy(condition);
1978 lttng_action_destroy(action);
1979 lttng_trigger_destroy(trigger);
1980 }
1981 rcu_read_unlock();
1982 return ret;
1983}
1984
cc2295b5 1985static
ab0ee2ca
JG
1986int handle_notification_thread_command_unregister_trigger(
1987 struct notification_thread_state *state,
1988 struct lttng_trigger *trigger,
1989 enum lttng_error_code *_cmd_reply)
1990{
1991 struct cds_lfht_iter iter;
ed327204 1992 struct cds_lfht_node *triggers_ht_node;
ab0ee2ca
JG
1993 struct lttng_channel_trigger_list *trigger_list;
1994 struct notification_client_list *client_list;
1995 struct notification_client_list_element *client_list_element, *tmp;
1996 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1997 struct lttng_condition *condition = lttng_trigger_get_condition(
1998 trigger);
1999 struct lttng_action *action;
2000 enum lttng_error_code cmd_reply;
2001
2002 rcu_read_lock();
2003
2004 cds_lfht_lookup(state->triggers_ht,
2005 lttng_condition_hash(condition),
2006 match_condition,
2007 condition,
2008 &iter);
2009 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2010 if (!triggers_ht_node) {
2011 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2012 goto end;
2013 } else {
2014 cmd_reply = LTTNG_OK;
2015 }
2016
2017 /* Remove trigger from channel_triggers_ht. */
2018 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2019 channel_triggers_ht_node) {
2020 struct lttng_trigger_list_element *trigger_element, *tmp;
2021
2022 cds_list_for_each_entry_safe(trigger_element, tmp,
2023 &trigger_list->list, node) {
9b63a4aa
JG
2024 const struct lttng_condition *current_condition =
2025 lttng_trigger_get_const_condition(
ab0ee2ca
JG
2026 trigger_element->trigger);
2027
2028 assert(current_condition);
2029 if (!lttng_condition_is_equal(condition,
2030 current_condition)) {
2031 continue;
2032 }
2033
2034 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2035 cds_list_del(&trigger_element->node);
e4db5ace
JR
2036 /* A trigger can only appear once per channel */
2037 break;
ab0ee2ca
JG
2038 }
2039 }
2040
2041 /*
2042 * Remove and release the client list from
2043 * notification_trigger_clients_ht.
2044 */
ed327204
JG
2045 client_list = get_client_list_from_condition(state, condition);
2046 assert(client_list);
2047
ab0ee2ca
JG
2048 cds_list_for_each_entry_safe(client_list_element, tmp,
2049 &client_list->list, node) {
2050 free(client_list_element);
2051 }
ed327204
JG
2052 cds_lfht_del(state->notification_trigger_clients_ht,
2053 &client_list->notification_trigger_ht_node);
ab0ee2ca
JG
2054 free(client_list);
2055
2056 /* Remove trigger from triggers_ht. */
2057 trigger_ht_element = caa_container_of(triggers_ht_node,
2058 struct lttng_trigger_ht_element, node);
2059 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2060
2061 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
2062 lttng_condition_destroy(condition);
2063 action = lttng_trigger_get_action(trigger_ht_element->trigger);
2064 lttng_action_destroy(action);
2065 lttng_trigger_destroy(trigger_ht_element->trigger);
2066 free(trigger_ht_element);
2067end:
2068 rcu_read_unlock();
2069 if (_cmd_reply) {
2070 *_cmd_reply = cmd_reply;
2071 }
2072 return 0;
2073}
2074
2075/* Returns 0 on success, 1 on exit requested, negative value on error. */
2076int handle_notification_thread_command(
2077 struct notification_thread_handle *handle,
2078 struct notification_thread_state *state)
2079{
2080 int ret;
2081 uint64_t counter;
2082 struct notification_thread_command *cmd;
2083
8abe313a
JG
2084 /* Read the event pipe to put it back into a quiescent state. */
2085 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2086 sizeof(counter));
ab0ee2ca
JG
2087 if (ret == -1) {
2088 goto error;
2089 }
2090
2091 pthread_mutex_lock(&handle->cmd_queue.lock);
2092 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2093 struct notification_thread_command, cmd_list_node);
2094 switch (cmd->type) {
2095 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2096 DBG("[notification-thread] Received register trigger command");
2097 ret = handle_notification_thread_command_register_trigger(
2098 state, cmd->parameters.trigger,
2099 &cmd->reply_code);
2100 break;
2101 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2102 DBG("[notification-thread] Received unregister trigger command");
2103 ret = handle_notification_thread_command_unregister_trigger(
2104 state, cmd->parameters.trigger,
2105 &cmd->reply_code);
2106 break;
2107 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2108 DBG("[notification-thread] Received add channel command");
2109 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
2110 state,
2111 cmd->parameters.add_channel.session.name,
2112 cmd->parameters.add_channel.session.uid,
2113 cmd->parameters.add_channel.session.gid,
2114 cmd->parameters.add_channel.channel.name,
2115 cmd->parameters.add_channel.channel.domain,
2116 cmd->parameters.add_channel.channel.key,
2117 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
2118 &cmd->reply_code);
2119 break;
2120 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2121 DBG("[notification-thread] Received remove channel command");
2122 ret = handle_notification_thread_command_remove_channel(
2123 state, cmd->parameters.remove_channel.key,
2124 cmd->parameters.remove_channel.domain,
2125 &cmd->reply_code);
2126 break;
0ca52944 2127 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
0ca52944 2128 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
ed327204
JG
2129 DBG("[notification-thread] Received session rotation %s command",
2130 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2131 "ongoing" : "completed");
2132 ret = handle_notification_thread_command_session_rotation(
0ca52944 2133 state,
ed327204
JG
2134 cmd->type,
2135 cmd->parameters.session_rotation.session_name,
2136 cmd->parameters.session_rotation.uid,
2137 cmd->parameters.session_rotation.gid,
2138 cmd->parameters.session_rotation.trace_archive_chunk_id,
2139 cmd->parameters.session_rotation.location,
0ca52944
JG
2140 &cmd->reply_code);
2141 break;
ab0ee2ca
JG
2142 case NOTIFICATION_COMMAND_TYPE_QUIT:
2143 DBG("[notification-thread] Received quit command");
2144 cmd->reply_code = LTTNG_OK;
2145 ret = 1;
2146 goto end;
2147 default:
2148 ERR("[notification-thread] Unknown internal command received");
2149 goto error_unlock;
2150 }
2151
2152 if (ret) {
2153 goto error_unlock;
2154 }
2155end:
2156 cds_list_del(&cmd->cmd_list_node);
8ada111f 2157 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2158 pthread_mutex_unlock(&handle->cmd_queue.lock);
2159 return ret;
2160error_unlock:
2161 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 2162 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2163 pthread_mutex_unlock(&handle->cmd_queue.lock);
2164 cmd->reply_code = LTTNG_ERR_FATAL;
2165error:
2166 /* Indicate a fatal error to the caller. */
2167 return -1;
2168}
2169
2170static
2171unsigned long hash_client_socket(int socket)
2172{
2173 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
2174}
2175
2176static
2177int socket_set_non_blocking(int socket)
2178{
2179 int ret, flags;
2180
2181 /* Set the pipe as non-blocking. */
2182 ret = fcntl(socket, F_GETFL, 0);
2183 if (ret == -1) {
2184 PERROR("fcntl get socket flags");
2185 goto end;
2186 }
2187 flags = ret;
2188
2189 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2190 if (ret == -1) {
2191 PERROR("fcntl set O_NONBLOCK socket flag");
2192 goto end;
2193 }
2194 DBG("Client socket (fd = %i) set as non-blocking", socket);
2195end:
2196 return ret;
2197}
2198
2199static
14fa22f8 2200int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
2201{
2202 int ret;
2203
2204 ret = lttng_dynamic_buffer_set_size(
2205 &client->communication.inbound.buffer, 0);
2206 assert(!ret);
2207
2208 client->communication.inbound.bytes_to_receive =
2209 sizeof(struct lttng_notification_channel_message);
2210 client->communication.inbound.msg_type =
2211 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
2212 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2213 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
2214 ret = lttng_dynamic_buffer_set_size(
2215 &client->communication.inbound.buffer,
2216 client->communication.inbound.bytes_to_receive);
2217 return ret;
ab0ee2ca
JG
2218}
2219
2220int handle_notification_thread_client_connect(
2221 struct notification_thread_state *state)
2222{
2223 int ret;
2224 struct notification_client *client;
2225
2226 DBG("[notification-thread] Handling new notification channel client connection");
2227
2228 client = zmalloc(sizeof(*client));
2229 if (!client) {
2230 /* Fatal error. */
2231 ret = -1;
2232 goto error;
2233 }
2234 CDS_INIT_LIST_HEAD(&client->condition_list);
2235 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2236 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 2237 client->communication.inbound.expect_creds = true;
14fa22f8
JG
2238 ret = client_reset_inbound_state(client);
2239 if (ret) {
2240 ERR("[notification-thread] Failed to reset client communication's inbound state");
2241 ret = 0;
2242 goto error;
2243 }
ab0ee2ca
JG
2244
2245 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2246 if (ret < 0) {
2247 ERR("[notification-thread] Failed to accept new notification channel client connection");
2248 ret = 0;
2249 goto error;
2250 }
2251
2252 client->socket = ret;
2253
2254 ret = socket_set_non_blocking(client->socket);
2255 if (ret) {
2256 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2257 goto error;
2258 }
2259
2260 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2261 if (ret < 0) {
2262 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2263 ret = 0;
2264 goto error;
2265 }
2266
2267 ret = lttng_poll_add(&state->events, client->socket,
2268 LPOLLIN | LPOLLERR |
2269 LPOLLHUP | LPOLLRDHUP);
2270 if (ret < 0) {
2271 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2272 ret = 0;
2273 goto error;
2274 }
2275 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2276 client->socket);
2277
ab0ee2ca
JG
2278 rcu_read_lock();
2279 cds_lfht_add(state->client_socket_ht,
2280 hash_client_socket(client->socket),
2281 &client->client_socket_ht_node);
2282 rcu_read_unlock();
2283
2284 return ret;
2285error:
2286 notification_client_destroy(client, state);
2287 return ret;
2288}
2289
2290int handle_notification_thread_client_disconnect(
2291 int client_socket,
2292 struct notification_thread_state *state)
2293{
2294 int ret = 0;
2295 struct notification_client *client;
2296
2297 rcu_read_lock();
2298 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2299 client_socket);
2300 client = get_client_from_socket(client_socket, state);
2301 if (!client) {
2302 /* Internal state corruption, fatal error. */
2303 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2304 client_socket);
2305 ret = -1;
2306 goto end;
2307 }
2308
2309 ret = lttng_poll_del(&state->events, client_socket);
2310 if (ret) {
2311 ERR("[notification-thread] Failed to remove client socket from poll set");
2312 }
2313 cds_lfht_del(state->client_socket_ht,
2314 &client->client_socket_ht_node);
2315 notification_client_destroy(client, state);
2316end:
2317 rcu_read_unlock();
2318 return ret;
2319}
2320
2321int handle_notification_thread_client_disconnect_all(
2322 struct notification_thread_state *state)
2323{
2324 struct cds_lfht_iter iter;
2325 struct notification_client *client;
2326 bool error_encoutered = false;
2327
2328 rcu_read_lock();
2329 DBG("[notification-thread] Closing all client connections");
2330 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2331 client_socket_ht_node) {
2332 int ret;
2333
2334 ret = handle_notification_thread_client_disconnect(
2335 client->socket, state);
2336 if (ret) {
2337 error_encoutered = true;
2338 }
2339 }
2340 rcu_read_unlock();
2341 return error_encoutered ? 1 : 0;
2342}
2343
2344int handle_notification_thread_trigger_unregister_all(
2345 struct notification_thread_state *state)
2346{
4149ace8 2347 bool error_occurred = false;
ab0ee2ca
JG
2348 struct cds_lfht_iter iter;
2349 struct lttng_trigger_ht_element *trigger_ht_element;
2350
2351 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2352 node) {
2353 int ret = handle_notification_thread_command_unregister_trigger(
2354 state, trigger_ht_element->trigger, NULL);
2355 if (ret) {
4149ace8 2356 error_occurred = true;
ab0ee2ca
JG
2357 }
2358 }
4149ace8 2359 return error_occurred ? -1 : 0;
ab0ee2ca
JG
2360}
2361
2362static
2363int client_flush_outgoing_queue(struct notification_client *client,
2364 struct notification_thread_state *state)
2365{
2366 ssize_t ret;
2367 size_t to_send_count;
2368
2369 assert(client->communication.outbound.buffer.size != 0);
2370 to_send_count = client->communication.outbound.buffer.size;
2371 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2372 client->socket);
2373
2374 ret = lttcomm_send_unix_sock_non_block(client->socket,
2375 client->communication.outbound.buffer.data,
2376 to_send_count);
2377 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
2378 (ret > 0 && ret < to_send_count)) {
2379 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2380 client->socket);
2381 to_send_count -= max(ret, 0);
2382
2383 memcpy(client->communication.outbound.buffer.data,
2384 client->communication.outbound.buffer.data +
2385 client->communication.outbound.buffer.size - to_send_count,
2386 to_send_count);
2387 ret = lttng_dynamic_buffer_set_size(
2388 &client->communication.outbound.buffer,
2389 to_send_count);
2390 if (ret) {
2391 goto error;
2392 }
2393
2394 /*
2395 * We want to be notified whenever there is buffer space
2396 * available to send the rest of the payload.
2397 */
2398 ret = lttng_poll_mod(&state->events, client->socket,
2399 CLIENT_POLL_MASK_IN_OUT);
2400 if (ret) {
2401 goto error;
2402 }
2403 } else if (ret < 0) {
2404 /* Generic error, disconnect the client. */
2405 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2406 client->socket);
2407 ret = handle_notification_thread_client_disconnect(
2408 client->socket, state);
2409 if (ret) {
2410 goto error;
2411 }
2412 } else {
2413 /* No error and flushed the queue completely. */
2414 ret = lttng_dynamic_buffer_set_size(
2415 &client->communication.outbound.buffer, 0);
2416 if (ret) {
2417 goto error;
2418 }
2419 ret = lttng_poll_mod(&state->events, client->socket,
2420 CLIENT_POLL_MASK_IN);
2421 if (ret) {
2422 goto error;
2423 }
2424
2425 client->communication.outbound.queued_command_reply = false;
2426 client->communication.outbound.dropped_notification = false;
2427 }
2428
2429 return 0;
2430error:
2431 return -1;
2432}
2433
2434static
2435int client_send_command_reply(struct notification_client *client,
2436 struct notification_thread_state *state,
2437 enum lttng_notification_channel_status status)
2438{
2439 int ret;
2440 struct lttng_notification_channel_command_reply reply = {
2441 .status = (int8_t) status,
2442 };
2443 struct lttng_notification_channel_message msg = {
2444 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2445 .size = sizeof(reply),
2446 };
2447 char buffer[sizeof(msg) + sizeof(reply)];
2448
2449 if (client->communication.outbound.queued_command_reply) {
2450 /* Protocol error. */
2451 goto error;
2452 }
2453
2454 memcpy(buffer, &msg, sizeof(msg));
2455 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2456 DBG("[notification-thread] Send command reply (%i)", (int) status);
2457
2458 /* Enqueue buffer to outgoing queue and flush it. */
2459 ret = lttng_dynamic_buffer_append(
2460 &client->communication.outbound.buffer,
2461 buffer, sizeof(buffer));
2462 if (ret) {
2463 goto error;
2464 }
2465
2466 ret = client_flush_outgoing_queue(client, state);
2467 if (ret) {
2468 goto error;
2469 }
2470
2471 if (client->communication.outbound.buffer.size != 0) {
2472 /* Queue could not be emptied. */
2473 client->communication.outbound.queued_command_reply = true;
2474 }
2475
2476 return 0;
2477error:
2478 return -1;
2479}
2480
2481static
2482int client_dispatch_message(struct notification_client *client,
2483 struct notification_thread_state *state)
2484{
2485 int ret = 0;
2486
2487 if (client->communication.inbound.msg_type !=
2488 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2489 client->communication.inbound.msg_type !=
2490 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2491 !client->validated) {
2492 WARN("[notification-thread] client attempted a command before handshake");
2493 ret = -1;
2494 goto end;
2495 }
2496
2497 switch (client->communication.inbound.msg_type) {
2498 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2499 {
2500 /*
2501 * Receiving message header. The function will be called again
2502 * once the rest of the message as been received and can be
2503 * interpreted.
2504 */
2505 const struct lttng_notification_channel_message *msg;
2506
2507 assert(sizeof(*msg) ==
2508 client->communication.inbound.buffer.size);
2509 msg = (const struct lttng_notification_channel_message *)
2510 client->communication.inbound.buffer.data;
2511
2512 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2513 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2514 ret = -1;
2515 goto end;
2516 }
2517
2518 switch (msg->type) {
2519 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2520 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2521 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2522 break;
2523 default:
2524 ret = -1;
2525 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2526 goto end;
2527 }
2528
2529 client->communication.inbound.bytes_to_receive = msg->size;
2530 client->communication.inbound.msg_type =
2531 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 2532 ret = lttng_dynamic_buffer_set_size(
14fa22f8 2533 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
2534 if (ret) {
2535 goto end;
2536 }
2537 break;
2538 }
2539 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2540 {
2541 struct lttng_notification_channel_command_handshake *handshake_client;
2542 struct lttng_notification_channel_command_handshake handshake_reply = {
2543 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2544 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2545 };
2546 struct lttng_notification_channel_message msg_header = {
2547 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2548 .size = sizeof(handshake_reply),
2549 };
2550 enum lttng_notification_channel_status status =
2551 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2552 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2553
2554 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2555 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2556 sizeof(handshake_reply));
2557
2558 handshake_client =
2559 (struct lttng_notification_channel_command_handshake *)
2560 client->communication.inbound.buffer.data;
47a32869 2561 client->major = handshake_client->major;
ab0ee2ca
JG
2562 client->minor = handshake_client->minor;
2563 if (!client->communication.inbound.creds_received) {
2564 ERR("[notification-thread] No credentials received from client");
2565 ret = -1;
2566 goto end;
2567 }
2568
2569 client->uid = LTTNG_SOCK_GET_UID_CRED(
2570 &client->communication.inbound.creds);
2571 client->gid = LTTNG_SOCK_GET_GID_CRED(
2572 &client->communication.inbound.creds);
2573 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2574 client->uid, client->gid, (int) client->major,
2575 (int) client->minor);
2576
2577 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2578 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2579 }
2580
2581 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2582 send_buffer, sizeof(send_buffer));
2583 if (ret) {
2584 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2585 goto end;
2586 }
2587
2588 ret = client_flush_outgoing_queue(client, state);
2589 if (ret) {
2590 goto end;
2591 }
2592
2593 ret = client_send_command_reply(client, state, status);
2594 if (ret) {
2595 ERR("[notification-thread] Failed to send reply to notification channel client");
2596 goto end;
2597 }
2598
2599 /* Set reception state to receive the next message header. */
14fa22f8
JG
2600 ret = client_reset_inbound_state(client);
2601 if (ret) {
2602 ERR("[notification-thread] Failed to reset client communication's inbound state");
2603 goto end;
2604 }
ab0ee2ca
JG
2605 client->validated = true;
2606 break;
2607 }
2608 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2609 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2610 {
2611 struct lttng_condition *condition;
2612 enum lttng_notification_channel_status status =
2613 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2614 const struct lttng_buffer_view condition_view =
2615 lttng_buffer_view_from_dynamic_buffer(
2616 &client->communication.inbound.buffer,
2617 0, -1);
2618 size_t expected_condition_size =
2619 client->communication.inbound.buffer.size;
2620
2621 ret = lttng_condition_create_from_buffer(&condition_view,
2622 &condition);
2623 if (ret != expected_condition_size) {
2624 ERR("[notification-thread] Malformed condition received from client");
2625 goto end;
2626 }
2627
2628 if (client->communication.inbound.msg_type ==
2629 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
ab0ee2ca
JG
2630 ret = notification_thread_client_subscribe(client,
2631 condition, state, &status);
2632 } else {
2633 ret = notification_thread_client_unsubscribe(client,
2634 condition, state, &status);
2635 }
2636 if (ret) {
2637 goto end;
2638 }
2639
2640 ret = client_send_command_reply(client, state, status);
2641 if (ret) {
2642 ERR("[notification-thread] Failed to send reply to notification channel client");
2643 goto end;
2644 }
2645
2646 /* Set reception state to receive the next message header. */
14fa22f8
JG
2647 ret = client_reset_inbound_state(client);
2648 if (ret) {
2649 ERR("[notification-thread] Failed to reset client communication's inbound state");
2650 goto end;
2651 }
ab0ee2ca
JG
2652 break;
2653 }
2654 default:
2655 abort();
2656 }
2657end:
2658 return ret;
2659}
2660
2661/* Incoming data from client. */
2662int handle_notification_thread_client_in(
2663 struct notification_thread_state *state, int socket)
2664{
14fa22f8 2665 int ret = 0;
ab0ee2ca
JG
2666 struct notification_client *client;
2667 ssize_t recv_ret;
2668 size_t offset;
2669
2670 client = get_client_from_socket(socket, state);
2671 if (!client) {
2672 /* Internal error, abort. */
2673 ret = -1;
2674 goto end;
2675 }
2676
14fa22f8
JG
2677 offset = client->communication.inbound.buffer.size -
2678 client->communication.inbound.bytes_to_receive;
01ea340e 2679 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
2680 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2681 client->communication.inbound.buffer.data + offset,
2682 client->communication.inbound.bytes_to_receive,
2683 &client->communication.inbound.creds);
2684 if (recv_ret > 0) {
01ea340e 2685 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
2686 client->communication.inbound.creds_received = true;
2687 }
2688 } else {
2689 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2690 client->communication.inbound.buffer.data + offset,
2691 client->communication.inbound.bytes_to_receive);
2692 }
2693 if (recv_ret < 0) {
2694 goto error_disconnect_client;
2695 }
2696
2697 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
2698 if (client->communication.inbound.bytes_to_receive == 0) {
2699 ret = client_dispatch_message(client, state);
2700 if (ret) {
2701 /*
2702 * Only returns an error if this client must be
2703 * disconnected.
2704 */
2705 goto error_disconnect_client;
2706 }
2707 } else {
2708 goto end;
2709 }
2710end:
2711 return ret;
2712error_disconnect_client:
2713 ret = handle_notification_thread_client_disconnect(socket, state);
2714 return ret;
2715}
2716
2717/* Client ready to receive outgoing data. */
2718int handle_notification_thread_client_out(
2719 struct notification_thread_state *state, int socket)
2720{
2721 int ret;
2722 struct notification_client *client;
2723
2724 client = get_client_from_socket(socket, state);
2725 if (!client) {
2726 /* Internal error, abort. */
2727 ret = -1;
2728 goto end;
2729 }
2730
2731 ret = client_flush_outgoing_queue(client, state);
2732 if (ret) {
2733 goto end;
2734 }
2735end:
2736 return ret;
2737}
2738
2739static
e8360425
JD
2740bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2741 const struct channel_state_sample *sample,
2742 uint64_t buffer_capacity)
ab0ee2ca
JG
2743{
2744 bool result = false;
2745 uint64_t threshold;
2746 enum lttng_condition_type condition_type;
e8360425 2747 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
2748 condition, struct lttng_condition_buffer_usage,
2749 parent);
2750
ab0ee2ca
JG
2751 if (use_condition->threshold_bytes.set) {
2752 threshold = use_condition->threshold_bytes.value;
2753 } else {
2754 /*
2755 * Threshold was expressed as a ratio.
2756 *
2757 * TODO the threshold (in bytes) of conditions expressed
2758 * as a ratio of total buffer size could be cached to
2759 * forego this double-multiplication or it could be performed
2760 * as fixed-point math.
2761 *
2762 * Note that caching should accomodate the case where the
2763 * condition applies to multiple channels (i.e. don't assume
2764 * that all channels matching my_chann* have the same size...)
2765 */
2766 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2767 (double) buffer_capacity);
2768 }
2769
2770 condition_type = lttng_condition_get_type(condition);
2771 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2772 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2773 threshold, sample->highest_usage);
2774
2775 /*
2776 * The low condition should only be triggered once _all_ of the
2777 * streams in a channel have gone below the "low" threshold.
2778 */
2779 if (sample->highest_usage <= threshold) {
2780 result = true;
2781 }
2782 } else {
2783 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2784 threshold, sample->highest_usage);
2785
2786 /*
2787 * For high buffer usage scenarios, we want to trigger whenever
2788 * _any_ of the streams has reached the "high" threshold.
2789 */
2790 if (sample->highest_usage >= threshold) {
2791 result = true;
2792 }
2793 }
e8360425 2794
ab0ee2ca
JG
2795 return result;
2796}
2797
2798static
e8360425
JD
2799bool evaluate_session_consumed_size_condition(
2800 const struct lttng_condition *condition,
2801 uint64_t session_consumed_size)
2802{
2803 uint64_t threshold;
2804 const struct lttng_condition_session_consumed_size *size_condition =
2805 container_of(condition,
2806 struct lttng_condition_session_consumed_size,
2807 parent);
2808
2809 threshold = size_condition->consumed_threshold_bytes.value;
2810 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
2811 threshold, session_consumed_size);
2812 return session_consumed_size >= threshold;
2813}
2814
2815static
ed327204 2816int evaluate_buffer_condition(const struct lttng_condition *condition,
ab0ee2ca 2817 struct lttng_evaluation **evaluation,
e8360425
JD
2818 const struct notification_thread_state *state,
2819 const struct channel_state_sample *previous_sample,
2820 const struct channel_state_sample *latest_sample,
2821 uint64_t previous_session_consumed_total,
2822 uint64_t latest_session_consumed_total,
2823 struct channel_info *channel_info)
ab0ee2ca
JG
2824{
2825 int ret = 0;
2826 enum lttng_condition_type condition_type;
e8360425
JD
2827 const bool previous_sample_available = !!previous_sample;
2828 bool previous_sample_result = false;
ab0ee2ca
JG
2829 bool latest_sample_result;
2830
2831 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 2832
e8360425
JD
2833 switch (condition_type) {
2834 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2835 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2836 if (caa_likely(previous_sample_available)) {
2837 previous_sample_result =
2838 evaluate_buffer_usage_condition(condition,
2839 previous_sample, channel_info->capacity);
2840 }
2841 latest_sample_result = evaluate_buffer_usage_condition(
2842 condition, latest_sample,
2843 channel_info->capacity);
2844 break;
2845 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2846 if (caa_likely(previous_sample_available)) {
2847 previous_sample_result =
2848 evaluate_session_consumed_size_condition(
2849 condition,
2850 previous_session_consumed_total);
2851 }
2852 latest_sample_result =
2853 evaluate_session_consumed_size_condition(
2854 condition,
2855 latest_session_consumed_total);
2856 break;
2857 default:
2858 /* Unknown condition type; internal error. */
2859 abort();
2860 }
ab0ee2ca
JG
2861
2862 if (!latest_sample_result ||
2863 (previous_sample_result == latest_sample_result)) {
2864 /*
2865 * Only trigger on a condition evaluation transition.
2866 *
2867 * NOTE: This edge-triggered logic may not be appropriate for
2868 * future condition types.
2869 */
2870 goto end;
2871 }
2872
e8360425
JD
2873 if (!evaluation || !latest_sample_result) {
2874 goto end;
2875 }
2876
2877 switch (condition_type) {
2878 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2879 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
2880 *evaluation = lttng_evaluation_buffer_usage_create(
2881 condition_type,
2882 latest_sample->highest_usage,
e8360425
JD
2883 channel_info->capacity);
2884 break;
2885 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2886 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
2887 latest_session_consumed_total);
2888 break;
2889 default:
2890 abort();
2891 }
2892
2893 if (!*evaluation) {
2894 ret = -1;
2895 goto end;
ab0ee2ca
JG
2896 }
2897end:
2898 return ret;
2899}
2900
2901static
2902int client_enqueue_dropped_notification(struct notification_client *client,
2903 struct notification_thread_state *state)
2904{
2905 int ret;
2906 struct lttng_notification_channel_message msg = {
2907 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2908 .size = 0,
2909 };
2910
2911 ret = lttng_dynamic_buffer_append(
2912 &client->communication.outbound.buffer, &msg,
2913 sizeof(msg));
2914 return ret;
2915}
2916
2917static
9b63a4aa
JG
2918int send_evaluation_to_clients(const struct lttng_trigger *trigger,
2919 const struct lttng_evaluation *evaluation,
ab0ee2ca
JG
2920 struct notification_client_list* client_list,
2921 struct notification_thread_state *state,
2922 uid_t channel_uid, gid_t channel_gid)
2923{
2924 int ret = 0;
2925 struct lttng_dynamic_buffer msg_buffer;
2926 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa
JG
2927 const struct lttng_notification notification = {
2928 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
2929 .evaluation = (struct lttng_evaluation *) evaluation,
2930 };
3647288f
JG
2931 struct lttng_notification_channel_message msg_header = {
2932 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
2933 };
ab0ee2ca
JG
2934
2935 lttng_dynamic_buffer_init(&msg_buffer);
2936
3647288f
JG
2937 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
2938 sizeof(msg_header));
ab0ee2ca
JG
2939 if (ret) {
2940 goto end;
2941 }
2942
9b63a4aa 2943 ret = lttng_notification_serialize(&notification, &msg_buffer);
ab0ee2ca 2944 if (ret) {
ab0ee2ca
JG
2945 ERR("[notification-thread] Failed to serialize notification");
2946 ret = -1;
2947 goto end;
2948 }
2949
3647288f
JG
2950 /* Update payload size. */
2951 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
2952 (uint32_t) (msg_buffer.size - sizeof(msg_header));
2953
ab0ee2ca
JG
2954 cds_list_for_each_entry_safe(client_list_element, tmp,
2955 &client_list->list, node) {
2956 struct notification_client *client =
2957 client_list_element->client;
2958
2959 if (client->uid != channel_uid && client->gid != channel_gid &&
2960 client->uid != 0) {
2961 /* Client is not allowed to monitor this channel. */
2962 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2963 continue;
2964 }
2965
2966 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2967 client->socket, msg_buffer.size);
2968 if (client->communication.outbound.buffer.size) {
2969 /*
2970 * Outgoing data is already buffered for this client;
2971 * drop the notification and enqueue a "dropped
2972 * notification" message if this is the first dropped
2973 * notification since the socket spilled-over to the
2974 * queue.
2975 */
2976 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2977 client->socket);
2978 if (!client->communication.outbound.dropped_notification) {
2979 client->communication.outbound.dropped_notification = true;
2980 ret = client_enqueue_dropped_notification(
2981 client, state);
2982 if (ret) {
2983 goto end;
2984 }
2985 }
2986 continue;
2987 }
2988
2989 ret = lttng_dynamic_buffer_append_buffer(
2990 &client->communication.outbound.buffer,
2991 &msg_buffer);
2992 if (ret) {
2993 goto end;
2994 }
2995
2996 ret = client_flush_outgoing_queue(client, state);
2997 if (ret) {
2998 goto end;
2999 }
3000 }
3001 ret = 0;
3002end:
ab0ee2ca
JG
3003 lttng_dynamic_buffer_reset(&msg_buffer);
3004 return ret;
3005}
3006
3007int handle_notification_thread_channel_sample(
3008 struct notification_thread_state *state, int pipe,
3009 enum lttng_domain_type domain)
3010{
3011 int ret = 0;
3012 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
3013 struct channel_info *channel_info;
3014 struct cds_lfht_node *node;
3015 struct cds_lfht_iter iter;
3016 struct lttng_channel_trigger_list *trigger_list;
3017 struct lttng_trigger_list_element *trigger_list_element;
3018 bool previous_sample_available = false;
e8360425
JD
3019 struct channel_state_sample previous_sample, latest_sample;
3020 uint64_t previous_session_consumed_total, latest_session_consumed_total;
ab0ee2ca
JG
3021
3022 /*
3023 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3024 * ensuring that read/write of sampling messages are atomic.
3025 */
7c2551ef 3026 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
3027 if (ret != sizeof(sample_msg)) {
3028 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3029 pipe);
3030 ret = -1;
3031 goto end;
3032 }
3033
3034 ret = 0;
3035 latest_sample.key.key = sample_msg.key;
3036 latest_sample.key.domain = domain;
3037 latest_sample.highest_usage = sample_msg.highest;
3038 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 3039 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
3040
3041 rcu_read_lock();
3042
3043 /* Retrieve the channel's informations */
3044 cds_lfht_lookup(state->channels_ht,
3045 hash_channel_key(&latest_sample.key),
3046 match_channel_info,
3047 &latest_sample.key,
3048 &iter);
3049 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3050 if (caa_unlikely(!node)) {
ab0ee2ca
JG
3051 /*
3052 * Not an error since the consumer can push a sample to the pipe
3053 * and the rest of the session daemon could notify us of the
3054 * channel's destruction before we get a chance to process that
3055 * sample.
3056 */
3057 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3058 latest_sample.key.key,
3059 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3060 "user space");
3061 goto end_unlock;
3062 }
3063 channel_info = caa_container_of(node, struct channel_info,
3064 channels_ht_node);
e8360425 3065 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 3066 channel_info->name,
ab0ee2ca 3067 latest_sample.key.key,
8abe313a 3068 channel_info->session_info->name,
ab0ee2ca 3069 latest_sample.highest_usage,
e8360425
JD
3070 latest_sample.lowest_usage,
3071 latest_sample.channel_total_consumed);
3072
3073 previous_session_consumed_total =
3074 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
3075
3076 /* Retrieve the channel's last sample, if it exists, and update it. */
3077 cds_lfht_lookup(state->channel_state_ht,
3078 hash_channel_key(&latest_sample.key),
3079 match_channel_state_sample,
3080 &latest_sample.key,
3081 &iter);
3082 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3083 if (caa_likely(node)) {
ab0ee2ca
JG
3084 struct channel_state_sample *stored_sample;
3085
3086 /* Update the sample stored. */
3087 stored_sample = caa_container_of(node,
3088 struct channel_state_sample,
3089 channel_state_ht_node);
e8360425 3090
ab0ee2ca
JG
3091 memcpy(&previous_sample, stored_sample,
3092 sizeof(previous_sample));
3093 stored_sample->highest_usage = latest_sample.highest_usage;
3094 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 3095 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 3096 previous_sample_available = true;
e8360425
JD
3097
3098 latest_session_consumed_total =
3099 previous_session_consumed_total +
3100 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
3101 } else {
3102 /*
3103 * This is the channel's first sample, allocate space for and
3104 * store the new sample.
3105 */
3106 struct channel_state_sample *stored_sample;
3107
3108 stored_sample = zmalloc(sizeof(*stored_sample));
3109 if (!stored_sample) {
3110 ret = -1;
3111 goto end_unlock;
3112 }
3113
3114 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3115 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3116 cds_lfht_add(state->channel_state_ht,
3117 hash_channel_key(&stored_sample->key),
3118 &stored_sample->channel_state_ht_node);
e8360425
JD
3119
3120 latest_session_consumed_total =
3121 previous_session_consumed_total +
3122 latest_sample.channel_total_consumed;
ab0ee2ca
JG
3123 }
3124
e8360425
JD
3125 channel_info->session_info->consumed_data_size =
3126 latest_session_consumed_total;
3127
ab0ee2ca
JG
3128 /* Find triggers associated with this channel. */
3129 cds_lfht_lookup(state->channel_triggers_ht,
3130 hash_channel_key(&latest_sample.key),
3131 match_channel_trigger_list,
3132 &latest_sample.key,
3133 &iter);
3134 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3135 if (caa_likely(!node)) {
ab0ee2ca
JG
3136 goto end_unlock;
3137 }
3138
3139 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3140 channel_triggers_ht_node);
3141 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3142 node) {
9b63a4aa
JG
3143 const struct lttng_condition *condition;
3144 const struct lttng_action *action;
3145 const struct lttng_trigger *trigger;
ab0ee2ca
JG
3146 struct notification_client_list *client_list;
3147 struct lttng_evaluation *evaluation = NULL;
3148
3149 trigger = trigger_list_element->trigger;
9b63a4aa 3150 condition = lttng_trigger_get_const_condition(trigger);
ab0ee2ca 3151 assert(condition);
9b63a4aa 3152 action = lttng_trigger_get_const_action(trigger);
ab0ee2ca
JG
3153
3154 /* Notify actions are the only type currently supported. */
9b63a4aa 3155 assert(lttng_action_get_type_const(action) ==
ab0ee2ca
JG
3156 LTTNG_ACTION_TYPE_NOTIFY);
3157
3158 /*
3159 * Check if any client is subscribed to the result of this
3160 * evaluation.
3161 */
ed327204
JG
3162 client_list = get_client_list_from_condition(state, condition);
3163 assert(client_list);
ab0ee2ca
JG
3164 if (cds_list_empty(&client_list->list)) {
3165 /*
3166 * No clients interested in the evaluation's result,
3167 * skip it.
3168 */
3169 continue;
3170 }
3171
ed327204 3172 ret = evaluate_buffer_condition(condition, &evaluation, state,
ab0ee2ca 3173 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
3174 &latest_sample,
3175 previous_session_consumed_total,
3176 latest_session_consumed_total,
3177 channel_info);
3178 if (caa_unlikely(ret)) {
ab0ee2ca
JG
3179 goto end_unlock;
3180 }
3181
e8360425 3182 if (caa_likely(!evaluation)) {
ab0ee2ca
JG
3183 continue;
3184 }
3185
3186 /* Dispatch evaluation result to all clients. */
3187 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3188 evaluation, client_list, state,
8abe313a
JG
3189 channel_info->session_info->uid,
3190 channel_info->session_info->gid);
3191 lttng_evaluation_destroy(evaluation);
e0b5f87b 3192 if (caa_unlikely(ret)) {
ab0ee2ca
JG
3193 goto end_unlock;
3194 }
3195 }
3196end_unlock:
3197 rcu_read_unlock();
3198end:
3199 return ret;
3200}
This page took 0.180243 seconds and 4 git commands to generate.