2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: LGPL-2.1-only
8 #include <lttng/notification/notification-internal.h>
9 #include <lttng/notification/channel-internal.h>
10 #include <lttng/condition/condition-internal.h>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/dynamic-buffer.h>
15 #include <common/utils.h>
16 #include <common/defaults.h>
17 #include <common/payload.h>
18 #include <common/payload-view.h>
19 #include <common/unix.h>
21 #include "lttng-ctl-helper.h"
22 #include <common/compat/poll.h>
25 int handshake(struct lttng_notification_channel
*channel
);
28 * Populates the reception buffer with the next complete message.
29 * The caller must acquire the channel's lock.
32 int receive_message(struct lttng_notification_channel
*channel
)
35 struct lttng_notification_channel_message msg
;
37 lttng_payload_clear(&channel
->reception_payload
);
39 ret
= lttcomm_recv_unix_sock(channel
->socket
, &msg
, sizeof(msg
));
45 if (msg
.size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
50 /* Add message header at buffer's start. */
51 ret
= lttng_dynamic_buffer_append(&channel
->reception_payload
.buffer
, &msg
,
57 /* Reserve space for the payload. */
58 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_payload
.buffer
,
59 channel
->reception_payload
.buffer
.size
+ msg
.size
);
64 /* Receive message payload. */
65 ret
= lttcomm_recv_unix_sock(channel
->socket
,
66 channel
->reception_payload
.buffer
.data
+ sizeof(msg
), msg
.size
);
67 if (ret
< (ssize_t
) msg
.size
) {
72 /* Receive message fds. */
74 ret
= lttcomm_recv_payload_fds_unix_sock(channel
->socket
,
75 msg
.fds
, &channel
->reception_payload
);
76 if (ret
< sizeof(int) * msg
.fds
) {
85 lttng_payload_clear(&channel
->reception_payload
);
90 enum lttng_notification_channel_message_type
get_current_message_type(
91 struct lttng_notification_channel
*channel
)
93 struct lttng_notification_channel_message
*msg
;
95 assert(channel
->reception_payload
.buffer
.size
>= sizeof(*msg
));
97 msg
= (struct lttng_notification_channel_message
*)
98 channel
->reception_payload
.buffer
.data
;
99 return (enum lttng_notification_channel_message_type
) msg
->type
;
103 struct lttng_notification
*create_notification_from_current_message(
104 struct lttng_notification_channel
*channel
)
107 struct lttng_notification
*notification
= NULL
;
109 if (channel
->reception_payload
.buffer
.size
<=
110 sizeof(struct lttng_notification_channel_message
)) {
115 struct lttng_payload_view view
= lttng_payload_view_from_payload(
116 &channel
->reception_payload
,
117 sizeof(struct lttng_notification_channel_message
),
120 ret
= lttng_notification_create_from_payload(
121 &view
, ¬ification
);
124 if (ret
!= channel
->reception_payload
.buffer
.size
-
125 sizeof(struct lttng_notification_channel_message
)) {
126 lttng_notification_destroy(notification
);
134 struct lttng_notification_channel
*lttng_notification_channel_create(
135 struct lttng_endpoint
*endpoint
)
138 bool is_in_tracing_group
= false, is_root
= false;
139 char *sock_path
= NULL
;
140 struct lttng_notification_channel
*channel
= NULL
;
143 endpoint
!= lttng_session_daemon_notification_endpoint
) {
147 sock_path
= zmalloc(LTTNG_PATH_MAX
);
152 channel
= zmalloc(sizeof(struct lttng_notification_channel
));
156 channel
->socket
= -1;
157 pthread_mutex_init(&channel
->lock
, NULL
);
158 lttng_payload_init(&channel
->reception_payload
);
159 CDS_INIT_LIST_HEAD(&channel
->pending_notifications
.list
);
161 is_root
= (getuid() == 0);
163 is_in_tracing_group
= lttng_check_tracing_group();
166 if (is_root
|| is_in_tracing_group
) {
167 ret
= lttng_strncpy(sock_path
,
168 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
,
171 ret
= -LTTNG_ERR_INVALID
;
175 ret
= lttcomm_connect_unix_sock(sock_path
);
182 /* Fallback to local session daemon. */
183 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
184 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
185 utils_get_home_dir());
186 if (ret
< 0 || ret
>= LTTNG_PATH_MAX
) {
190 ret
= lttcomm_connect_unix_sock(sock_path
);
197 channel
->socket
= fd
;
199 ret
= handshake(channel
);
207 lttng_notification_channel_destroy(channel
);
212 enum lttng_notification_channel_status
213 lttng_notification_channel_get_next_notification(
214 struct lttng_notification_channel
*channel
,
215 struct lttng_notification
**_notification
)
218 struct lttng_notification
*notification
= NULL
;
219 enum lttng_notification_channel_status status
=
220 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
221 struct lttng_poll_event events
;
223 if (!channel
|| !_notification
) {
224 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
228 pthread_mutex_lock(&channel
->lock
);
230 if (channel
->pending_notifications
.count
) {
231 struct pending_notification
*pending_notification
;
233 assert(!cds_list_empty(&channel
->pending_notifications
.list
));
235 /* Deliver one of the pending notifications. */
236 pending_notification
= cds_list_first_entry(
237 &channel
->pending_notifications
.list
,
238 struct pending_notification
,
240 notification
= pending_notification
->notification
;
242 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
244 cds_list_del(&pending_notification
->node
);
245 channel
->pending_notifications
.count
--;
246 free(pending_notification
);
251 * Block on interruptible epoll/poll() instead of the message reception
252 * itself as the recvmsg() wrappers always restart on EINTR. We choose
253 * to wait using interruptible epoll/poll() in order to:
254 * 1) Return if a signal occurs,
255 * 2) Not deal with partially received messages.
257 * The drawback to this approach is that we assume that messages
258 * are complete/well formed. If a message is shorter than its
259 * announced length, receive_message() will block on recvmsg()
260 * and never return (even if a signal is received).
262 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
264 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
267 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
269 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
272 ret
= lttng_poll_wait_interruptible(&events
, -1);
274 status
= (ret
== -1 && errno
== EINTR
) ?
275 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED
:
276 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
280 ret
= receive_message(channel
);
282 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
286 switch (get_current_message_type(channel
)) {
287 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
288 notification
= create_notification_from_current_message(
291 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
295 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
296 /* No payload to consume. */
297 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
300 /* Protocol error. */
301 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
306 lttng_poll_clean(&events
);
308 pthread_mutex_unlock(&channel
->lock
);
309 *_notification
= notification
;
315 int enqueue_dropped_notification(
316 struct lttng_notification_channel
*channel
)
319 struct pending_notification
*pending_notification
;
320 struct cds_list_head
*last_element
=
321 channel
->pending_notifications
.list
.prev
;
323 pending_notification
= caa_container_of(last_element
,
324 struct pending_notification
, node
);
325 if (!pending_notification
->notification
) {
327 * The last enqueued notification indicates dropped
328 * notifications; there is nothing to do as we group
329 * dropped notifications together.
334 if (channel
->pending_notifications
.count
>=
335 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
&&
336 pending_notification
->notification
) {
338 * Discard the last enqueued notification to indicate
339 * that notifications were dropped at this point.
341 lttng_notification_destroy(
342 pending_notification
->notification
);
343 pending_notification
->notification
= NULL
;
347 pending_notification
= zmalloc(sizeof(*pending_notification
));
348 if (!pending_notification
) {
352 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
353 cds_list_add(&pending_notification
->node
,
354 &channel
->pending_notifications
.list
);
355 channel
->pending_notifications
.count
++;
361 int enqueue_notification_from_current_message(
362 struct lttng_notification_channel
*channel
)
365 struct lttng_notification
*notification
;
366 struct pending_notification
*pending_notification
;
368 if (channel
->pending_notifications
.count
>=
369 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
) {
370 /* Drop the notification. */
371 ret
= enqueue_dropped_notification(channel
);
375 pending_notification
= zmalloc(sizeof(*pending_notification
));
376 if (!pending_notification
) {
380 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
382 notification
= create_notification_from_current_message(channel
);
388 pending_notification
->notification
= notification
;
389 cds_list_add(&pending_notification
->node
,
390 &channel
->pending_notifications
.list
);
391 channel
->pending_notifications
.count
++;
395 free(pending_notification
);
399 enum lttng_notification_channel_status
400 lttng_notification_channel_has_pending_notification(
401 struct lttng_notification_channel
*channel
,
402 bool *_notification_pending
)
405 enum lttng_notification_channel_status status
=
406 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
407 struct lttng_poll_event events
;
409 if (!channel
|| !_notification_pending
) {
410 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
414 pthread_mutex_lock(&channel
->lock
);
416 if (channel
->pending_notifications
.count
) {
417 *_notification_pending
= true;
421 if (channel
->socket
< 0) {
422 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
;
427 * Check, without blocking, if data is available on the channel's
428 * socket. If there is data available, it is safe to read (blocking)
429 * on the socket for a message from the session daemon.
431 * Since all commands wait for the session daemon's reply before
432 * releasing the channel's lock, the protocol only allows for
433 * notifications and "notification dropped" messages to come
434 * through. If we receive a different message type, it is
435 * considered a protocol error.
437 * Note that this function is not guaranteed not to block. This
438 * will block until our peer (the session daemon) has sent a complete
439 * message if we see data available on the socket. If the peer does
440 * not respect the protocol, this may block indefinitely.
442 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
444 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
447 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
449 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
452 /* timeout = 0: return immediately. */
453 ret
= lttng_poll_wait_interruptible(&events
, 0);
455 /* No data available. */
456 *_notification_pending
= false;
458 } else if (ret
< 0) {
459 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
463 /* Data available on socket. */
464 ret
= receive_message(channel
);
466 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
470 switch (get_current_message_type(channel
)) {
471 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
472 ret
= enqueue_notification_from_current_message(channel
);
476 *_notification_pending
= true;
478 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
479 ret
= enqueue_dropped_notification(channel
);
483 *_notification_pending
= true;
486 /* Protocol error. */
487 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
492 lttng_poll_clean(&events
);
494 pthread_mutex_unlock(&channel
->lock
);
500 int receive_command_reply(struct lttng_notification_channel
*channel
,
501 enum lttng_notification_channel_status
*status
)
504 struct lttng_notification_channel_command_reply
*reply
;
507 enum lttng_notification_channel_message_type msg_type
;
509 ret
= receive_message(channel
);
514 msg_type
= get_current_message_type(channel
);
516 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
:
518 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
519 ret
= enqueue_notification_from_current_message(
525 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
526 ret
= enqueue_dropped_notification(channel
);
531 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
533 struct lttng_notification_channel_command_handshake
*handshake
;
535 handshake
= (struct lttng_notification_channel_command_handshake
*)
536 (channel
->reception_payload
.buffer
.data
+
537 sizeof(struct lttng_notification_channel_message
));
538 channel
->version
.major
= handshake
->major
;
539 channel
->version
.minor
= handshake
->minor
;
540 channel
->version
.set
= true;
550 if (channel
->reception_payload
.buffer
.size
<
551 (sizeof(struct lttng_notification_channel_message
) +
553 /* Invalid message received. */
558 reply
= (struct lttng_notification_channel_command_reply
*)
559 (channel
->reception_payload
.buffer
.data
+
560 sizeof(struct lttng_notification_channel_message
));
561 *status
= (enum lttng_notification_channel_status
) reply
->status
;
567 int handshake(struct lttng_notification_channel
*channel
)
570 enum lttng_notification_channel_status status
=
571 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
572 struct lttng_notification_channel_command_handshake handshake
= {
573 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
574 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
576 struct lttng_notification_channel_message msg_header
= {
577 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
578 .size
= sizeof(handshake
),
580 char send_buffer
[sizeof(msg_header
) + sizeof(handshake
)];
582 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
583 memcpy(send_buffer
+ sizeof(msg_header
), &handshake
, sizeof(handshake
));
585 pthread_mutex_lock(&channel
->lock
);
587 ret
= lttcomm_send_creds_unix_sock(channel
->socket
, send_buffer
,
588 sizeof(send_buffer
));
593 /* Receive handshake info from the sessiond. */
594 ret
= receive_command_reply(channel
, &status
);
599 if (!channel
->version
.set
) {
604 if (channel
->version
.major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
610 pthread_mutex_unlock(&channel
->lock
);
615 enum lttng_notification_channel_status
send_condition_command(
616 struct lttng_notification_channel
*channel
,
617 enum lttng_notification_channel_message_type type
,
618 const struct lttng_condition
*condition
)
622 enum lttng_notification_channel_status status
=
623 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
624 struct lttng_payload payload
;
625 struct lttng_notification_channel_message cmd_header
= {
626 .type
= (int8_t) type
,
629 lttng_payload_init(&payload
);
632 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
636 assert(type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
||
637 type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
);
639 pthread_mutex_lock(&channel
->lock
);
640 socket
= channel
->socket
;
642 if (!lttng_condition_validate(condition
)) {
643 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
647 ret
= lttng_dynamic_buffer_append(&payload
.buffer
, &cmd_header
,
650 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
654 ret
= lttng_condition_serialize(condition
, &payload
);
656 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
660 /* Update payload length. */
661 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->size
=
662 (uint32_t) (payload
.buffer
.size
- sizeof(cmd_header
));
665 struct lttng_payload_view pv
=
666 lttng_payload_view_from_payload(
669 lttng_payload_view_get_fd_handle_count(&pv
);
671 /* Update fd count. */
672 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->fds
=
675 ret
= lttcomm_send_unix_sock(
676 socket
, pv
.buffer
.data
, pv
.buffer
.size
);
678 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
682 /* Pass fds if present. */
684 ret
= lttcomm_send_payload_view_fds_unix_sock(socket
,
687 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
693 ret
= receive_command_reply(channel
, &status
);
695 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
699 pthread_mutex_unlock(&channel
->lock
);
701 lttng_payload_reset(&payload
);
705 enum lttng_notification_channel_status
lttng_notification_channel_subscribe(
706 struct lttng_notification_channel
*channel
,
707 const struct lttng_condition
*condition
)
709 return send_condition_command(channel
,
710 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
,
714 enum lttng_notification_channel_status
lttng_notification_channel_unsubscribe(
715 struct lttng_notification_channel
*channel
,
716 const struct lttng_condition
*condition
)
718 return send_condition_command(channel
,
719 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
,
723 void lttng_notification_channel_destroy(
724 struct lttng_notification_channel
*channel
)
730 if (channel
->socket
>= 0) {
731 (void) lttcomm_close_unix_sock(channel
->socket
);
733 pthread_mutex_destroy(&channel
->lock
);
734 lttng_payload_reset(&channel
->reception_payload
);