2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License, version 2.1 only,
6 * as published by the Free Software Foundation.
8 * This library is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 #include <lttng/notification/notification-internal.h>
19 #include <lttng/notification/channel-internal.h>
20 #include <lttng/condition/condition-internal.h>
21 #include <lttng/endpoint.h>
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/dynamic-buffer.h>
25 #include <common/utils.h>
26 #include <common/defaults.h>
28 #include "lttng-ctl-helper.h"
29 #include <common/compat/poll.h>
32 int handshake(struct lttng_notification_channel
*channel
);
35 * Populates the reception buffer with the next complete message.
36 * The caller must acquire the channel's lock.
39 int receive_message(struct lttng_notification_channel
*channel
)
42 struct lttng_notification_channel_message msg
;
44 if (lttng_dynamic_buffer_set_size(&channel
->reception_buffer
, 0)) {
49 ret
= lttcomm_recv_unix_sock(channel
->socket
, &msg
, sizeof(msg
));
55 if (msg
.size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
60 /* Add message header at buffer's start. */
61 ret
= lttng_dynamic_buffer_append(&channel
->reception_buffer
, &msg
,
67 /* Reserve space for the payload. */
68 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_buffer
,
69 channel
->reception_buffer
.size
+ msg
.size
);
74 /* Receive message payload. */
75 ret
= lttcomm_recv_unix_sock(channel
->socket
,
76 channel
->reception_buffer
.data
+ sizeof(msg
), msg
.size
);
77 if (ret
< (ssize_t
) msg
.size
) {
85 if (lttng_dynamic_buffer_set_size(&channel
->reception_buffer
, 0)) {
92 enum lttng_notification_channel_message_type
get_current_message_type(
93 struct lttng_notification_channel
*channel
)
95 struct lttng_notification_channel_message
*msg
;
97 assert(channel
->reception_buffer
.size
>= sizeof(*msg
));
99 msg
= (struct lttng_notification_channel_message
*)
100 channel
->reception_buffer
.data
;
101 return (enum lttng_notification_channel_message_type
) msg
->type
;
105 struct lttng_notification
*create_notification_from_current_message(
106 struct lttng_notification_channel
*channel
)
109 struct lttng_notification
*notification
= NULL
;
110 struct lttng_buffer_view view
;
112 if (channel
->reception_buffer
.size
<=
113 sizeof(struct lttng_notification_channel_message
)) {
117 view
= lttng_buffer_view_from_dynamic_buffer(&channel
->reception_buffer
,
118 sizeof(struct lttng_notification_channel_message
), -1);
120 ret
= lttng_notification_create_from_buffer(&view
, ¬ification
);
121 if (ret
!= channel
->reception_buffer
.size
-
122 sizeof(struct lttng_notification_channel_message
)) {
123 lttng_notification_destroy(notification
);
131 struct lttng_notification_channel
*lttng_notification_channel_create(
132 struct lttng_endpoint
*endpoint
)
135 bool is_in_tracing_group
= false, is_root
= false;
136 char *sock_path
= NULL
;
137 struct lttng_notification_channel
*channel
= NULL
;
140 endpoint
!= lttng_session_daemon_notification_endpoint
) {
144 sock_path
= zmalloc(LTTNG_PATH_MAX
);
149 channel
= zmalloc(sizeof(struct lttng_notification_channel
));
153 channel
->socket
= -1;
154 pthread_mutex_init(&channel
->lock
, NULL
);
155 lttng_dynamic_buffer_init(&channel
->reception_buffer
);
156 CDS_INIT_LIST_HEAD(&channel
->pending_notifications
.list
);
158 is_root
= (getuid() == 0);
160 is_in_tracing_group
= lttng_check_tracing_group();
163 if (is_root
|| is_in_tracing_group
) {
164 ret
= lttng_strncpy(sock_path
,
165 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
,
168 ret
= -LTTNG_ERR_INVALID
;
172 ret
= lttcomm_connect_unix_sock(sock_path
);
179 /* Fallback to local session daemon. */
180 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
181 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
182 utils_get_home_dir());
183 if (ret
< 0 || ret
>= LTTNG_PATH_MAX
) {
187 ret
= lttcomm_connect_unix_sock(sock_path
);
194 channel
->socket
= fd
;
196 ret
= handshake(channel
);
204 lttng_notification_channel_destroy(channel
);
209 enum lttng_notification_channel_status
210 lttng_notification_channel_get_next_notification(
211 struct lttng_notification_channel
*channel
,
212 struct lttng_notification
**_notification
)
215 struct lttng_notification
*notification
= NULL
;
216 enum lttng_notification_channel_status status
=
217 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
218 struct lttng_poll_event events
;
220 if (!channel
|| !_notification
) {
221 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
225 pthread_mutex_lock(&channel
->lock
);
227 if (channel
->pending_notifications
.count
) {
228 struct pending_notification
*pending_notification
;
230 assert(!cds_list_empty(&channel
->pending_notifications
.list
));
232 /* Deliver one of the pending notifications. */
233 pending_notification
= cds_list_first_entry(
234 &channel
->pending_notifications
.list
,
235 struct pending_notification
,
237 notification
= pending_notification
->notification
;
239 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
241 cds_list_del(&pending_notification
->node
);
242 channel
->pending_notifications
.count
--;
243 free(pending_notification
);
248 * Block on interruptible epoll/poll() instead of the message reception
249 * itself as the recvmsg() wrappers always restart on EINTR. We choose
250 * to wait using interruptible epoll/poll() in order to:
251 * 1) Return if a signal occurs,
252 * 2) Not deal with partially received messages.
254 * The drawback to this approach is that we assume that messages
255 * are complete/well formed. If a message is shorter than its
256 * announced length, receive_message() will block on recvmsg()
257 * and never return (even if a signal is received).
259 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
261 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
264 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
266 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
269 ret
= lttng_poll_wait_interruptible(&events
, -1);
271 status
= (ret
== -1 && errno
== EINTR
) ?
272 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED
:
273 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
277 ret
= receive_message(channel
);
279 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
283 switch (get_current_message_type(channel
)) {
284 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
285 notification
= create_notification_from_current_message(
288 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
292 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
293 /* No payload to consume. */
294 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
297 /* Protocol error. */
298 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
303 lttng_poll_clean(&events
);
305 pthread_mutex_unlock(&channel
->lock
);
306 *_notification
= notification
;
312 int enqueue_dropped_notification(
313 struct lttng_notification_channel
*channel
)
316 struct pending_notification
*pending_notification
;
317 struct cds_list_head
*last_element
=
318 channel
->pending_notifications
.list
.prev
;
320 pending_notification
= caa_container_of(last_element
,
321 struct pending_notification
, node
);
322 if (!pending_notification
->notification
) {
324 * The last enqueued notification indicates dropped
325 * notifications; there is nothing to do as we group
326 * dropped notifications together.
331 if (channel
->pending_notifications
.count
>=
332 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
&&
333 pending_notification
->notification
) {
335 * Discard the last enqueued notification to indicate
336 * that notifications were dropped at this point.
338 lttng_notification_destroy(
339 pending_notification
->notification
);
340 pending_notification
->notification
= NULL
;
344 pending_notification
= zmalloc(sizeof(*pending_notification
));
345 if (!pending_notification
) {
349 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
350 cds_list_add(&pending_notification
->node
,
351 &channel
->pending_notifications
.list
);
352 channel
->pending_notifications
.count
++;
358 int enqueue_notification_from_current_message(
359 struct lttng_notification_channel
*channel
)
362 struct lttng_notification
*notification
;
363 struct pending_notification
*pending_notification
;
365 if (channel
->pending_notifications
.count
>=
366 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
) {
367 /* Drop the notification. */
368 ret
= enqueue_dropped_notification(channel
);
372 pending_notification
= zmalloc(sizeof(*pending_notification
));
373 if (!pending_notification
) {
377 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
379 notification
= create_notification_from_current_message(channel
);
385 pending_notification
->notification
= notification
;
386 cds_list_add(&pending_notification
->node
,
387 &channel
->pending_notifications
.list
);
388 channel
->pending_notifications
.count
++;
392 free(pending_notification
);
396 enum lttng_notification_channel_status
397 lttng_notification_channel_has_pending_notification(
398 struct lttng_notification_channel
*channel
,
399 bool *_notification_pending
)
402 enum lttng_notification_channel_status status
=
403 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
404 struct lttng_poll_event events
;
406 if (!channel
|| !_notification_pending
) {
407 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
411 pthread_mutex_lock(&channel
->lock
);
413 if (channel
->pending_notifications
.count
) {
414 *_notification_pending
= true;
418 if (channel
->socket
< 0) {
419 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
;
424 * Check, without blocking, if data is available on the channel's
425 * socket. If there is data available, it is safe to read (blocking)
426 * on the socket for a message from the session daemon.
428 * Since all commands wait for the session daemon's reply before
429 * releasing the channel's lock, the protocol only allows for
430 * notifications and "notification dropped" messages to come
431 * through. If we receive a different message type, it is
432 * considered a protocol error.
434 * Note that this function is not guaranteed not to block. This
435 * will block until our peer (the session daemon) has sent a complete
436 * message if we see data available on the socket. If the peer does
437 * not respect the protocol, this may block indefinitely.
439 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
441 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
444 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
446 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
449 /* timeout = 0: return immediately. */
450 ret
= lttng_poll_wait_interruptible(&events
, 0);
452 /* No data available. */
453 *_notification_pending
= false;
455 } else if (ret
< 0) {
456 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
460 /* Data available on socket. */
461 ret
= receive_message(channel
);
463 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
467 switch (get_current_message_type(channel
)) {
468 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
469 ret
= enqueue_notification_from_current_message(channel
);
473 *_notification_pending
= true;
475 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
476 ret
= enqueue_dropped_notification(channel
);
480 *_notification_pending
= true;
483 /* Protocol error. */
484 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
489 lttng_poll_clean(&events
);
491 pthread_mutex_unlock(&channel
->lock
);
497 int receive_command_reply(struct lttng_notification_channel
*channel
,
498 enum lttng_notification_channel_status
*status
)
501 struct lttng_notification_channel_command_reply
*reply
;
504 enum lttng_notification_channel_message_type msg_type
;
506 ret
= receive_message(channel
);
511 msg_type
= get_current_message_type(channel
);
513 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
:
515 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
516 ret
= enqueue_notification_from_current_message(
522 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
523 ret
= enqueue_dropped_notification(channel
);
528 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
530 struct lttng_notification_channel_command_handshake
*handshake
;
532 handshake
= (struct lttng_notification_channel_command_handshake
*)
533 (channel
->reception_buffer
.data
+
534 sizeof(struct lttng_notification_channel_message
));
535 channel
->version
.major
= handshake
->major
;
536 channel
->version
.minor
= handshake
->minor
;
537 channel
->version
.set
= true;
547 if (channel
->reception_buffer
.size
<
548 (sizeof(struct lttng_notification_channel_message
) +
550 /* Invalid message received. */
555 reply
= (struct lttng_notification_channel_command_reply
*)
556 (channel
->reception_buffer
.data
+
557 sizeof(struct lttng_notification_channel_message
));
558 *status
= (enum lttng_notification_channel_status
) reply
->status
;
564 int handshake(struct lttng_notification_channel
*channel
)
567 enum lttng_notification_channel_status status
=
568 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
569 struct lttng_notification_channel_command_handshake handshake
= {
570 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
571 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
573 struct lttng_notification_channel_message msg_header
= {
574 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
575 .size
= sizeof(handshake
),
577 char send_buffer
[sizeof(msg_header
) + sizeof(handshake
)];
579 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
580 memcpy(send_buffer
+ sizeof(msg_header
), &handshake
, sizeof(handshake
));
582 pthread_mutex_lock(&channel
->lock
);
584 ret
= lttcomm_send_creds_unix_sock(channel
->socket
, send_buffer
,
585 sizeof(send_buffer
));
590 /* Receive handshake info from the sessiond. */
591 ret
= receive_command_reply(channel
, &status
);
596 if (!channel
->version
.set
) {
601 if (channel
->version
.major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
607 pthread_mutex_unlock(&channel
->lock
);
612 enum lttng_notification_channel_status
send_condition_command(
613 struct lttng_notification_channel
*channel
,
614 enum lttng_notification_channel_message_type type
,
615 const struct lttng_condition
*condition
)
619 enum lttng_notification_channel_status status
=
620 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
621 struct lttng_dynamic_buffer buffer
;
622 struct lttng_notification_channel_message cmd_header
= {
623 .type
= (int8_t) type
,
626 lttng_dynamic_buffer_init(&buffer
);
629 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
633 assert(type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
||
634 type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
);
636 pthread_mutex_lock(&channel
->lock
);
637 socket
= channel
->socket
;
638 if (!lttng_condition_validate(condition
)) {
639 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
643 ret
= lttng_dynamic_buffer_append(&buffer
, &cmd_header
,
646 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
650 ret
= lttng_condition_serialize(condition
, &buffer
);
652 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
656 /* Update payload length. */
657 ((struct lttng_notification_channel_message
*) buffer
.data
)->size
=
658 (uint32_t) (buffer
.size
- sizeof(cmd_header
));
660 ret
= lttcomm_send_unix_sock(socket
, buffer
.data
, buffer
.size
);
662 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
666 ret
= receive_command_reply(channel
, &status
);
668 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
672 pthread_mutex_unlock(&channel
->lock
);
674 lttng_dynamic_buffer_reset(&buffer
);
678 enum lttng_notification_channel_status
lttng_notification_channel_subscribe(
679 struct lttng_notification_channel
*channel
,
680 const struct lttng_condition
*condition
)
682 return send_condition_command(channel
,
683 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
,
687 enum lttng_notification_channel_status
lttng_notification_channel_unsubscribe(
688 struct lttng_notification_channel
*channel
,
689 const struct lttng_condition
*condition
)
691 return send_condition_command(channel
,
692 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
,
696 void lttng_notification_channel_destroy(
697 struct lttng_notification_channel
*channel
)
703 if (channel
->socket
>= 0) {
704 (void) lttcomm_close_unix_sock(channel
->socket
);
706 pthread_mutex_destroy(&channel
->lock
);
707 lttng_dynamic_buffer_reset(&channel
->reception_buffer
);