2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
8 #include <common/common.hpp>
9 #include <common/compat/endian.hpp>
10 #include <common/compat/errno.hpp>
11 #include <common/compat/time.hpp>
12 #include <common/index/ctf-index.hpp>
14 #include <lttng/lttng.h>
16 #include <bin/lttng-relayd/lttng-viewer-abi.hpp>
20 #include <netinet/in.h>
25 #include <sys/socket.h>
27 #include <sys/types.h>
30 #include <urcu/list.h>
32 #define SESSION1 "test1"
33 #define RELAYD_URL "net://localhost"
34 #define LIVE_TIMER 2000000
36 /* Number of TAP tests in this file */
38 #define mmap_size 524288
40 #ifdef HAVE_LIBLTTNG_UST_CTL
41 #include <lttng/lttng-export.h>
42 #include <lttng/ust-sigbus.h>
43 LTTNG_EXPORT
DEFINE_LTTNG_UST_SIGBUS_STATE();
47 struct live_session
*session
;
50 int first_packet_offset
;
52 int first_packet_stream_id
= -1;
54 struct viewer_stream
{
56 uint64_t ctf_trace_id
;
65 struct viewer_stream
*streams
;
66 uint64_t live_timer_interval
;
67 uint64_t stream_count
;
71 static ssize_t
lttng_live_recv(int fd
, void *buf
, size_t len
)
74 size_t copied
= 0, to_copy
= len
;
77 ret
= recv(fd
, (char *) buf
+ copied
, to_copy
, 0);
79 LTTNG_ASSERT(ret
<= to_copy
);
83 } while ((ret
> 0 && to_copy
> 0) || (ret
< 0 && errno
== EINTR
));
86 /* ret = 0 means orderly shutdown, ret < 0 is error. */
90 static ssize_t
lttng_live_send(int fd
, const void *buf
, size_t len
)
95 ret
= send(fd
, buf
, len
, MSG_NOSIGNAL
);
96 } while (ret
< 0 && errno
== EINTR
);
100 static int connect_viewer(const char *hostname
)
102 struct hostent
*host
;
103 struct sockaddr_in server_addr
;
106 host
= gethostbyname(hostname
);
112 if ((control_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
118 server_addr
.sin_family
= AF_INET
;
119 server_addr
.sin_port
= htons(5344);
120 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
121 bzero(&(server_addr
.sin_zero
), 8);
123 if (connect(control_sock
, (struct sockaddr
*) &server_addr
, sizeof(struct sockaddr
)) ==
130 server_addr
.sin_family
= AF_INET
;
131 server_addr
.sin_port
= htons(5345);
132 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
133 bzero(&(server_addr
.sin_zero
), 8);
141 static int establish_connection(void)
143 struct lttng_viewer_cmd cmd
;
144 struct lttng_viewer_connect connect
;
147 cmd
.cmd
= htobe32(LTTNG_VIEWER_CONNECT
);
148 cmd
.data_size
= htobe64(sizeof(connect
));
149 cmd
.cmd_version
= htobe32(0);
151 memset(&connect
, 0, sizeof(connect
));
152 connect
.major
= htobe32(VERSION_MAJOR
);
153 connect
.minor
= htobe32(VERSION_MINOR
);
154 connect
.type
= htobe32(LTTNG_VIEWER_CLIENT_COMMAND
);
156 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
158 diag("Error sending cmd");
161 ret_len
= lttng_live_send(control_sock
, &connect
, sizeof(connect
));
163 diag("Error sending version");
167 ret_len
= lttng_live_recv(control_sock
, &connect
, sizeof(connect
));
169 diag("[error] Remote side has closed connection");
173 diag("Error receiving version");
183 * Returns the number of sessions, should be 1 during the unit test.
185 static int list_sessions(uint64_t *session_id
)
187 struct lttng_viewer_cmd cmd
;
188 struct lttng_viewer_list_sessions list
;
189 struct lttng_viewer_session lsession
;
192 int first_session
= 0;
194 cmd
.cmd
= htobe32(LTTNG_VIEWER_LIST_SESSIONS
);
195 cmd
.data_size
= htobe64(0);
196 cmd
.cmd_version
= htobe32(0);
198 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
200 diag("Error sending cmd");
204 ret_len
= lttng_live_recv(control_sock
, &list
, sizeof(list
));
206 diag("[error] Remote side has closed connection");
210 diag("Error receiving session list");
214 for (i
= 0; i
< be32toh(list
.sessions_count
); i
++) {
215 ret_len
= lttng_live_recv(control_sock
, &lsession
, sizeof(lsession
));
217 diag("Error receiving session");
220 if (lsession
.streams
> 0 && first_session
<= 0) {
221 first_session
= be64toh(lsession
.id
);
222 *session_id
= first_session
;
226 return be32toh(list
.sessions_count
);
232 static int create_viewer_session(void)
234 struct lttng_viewer_cmd cmd
;
235 struct lttng_viewer_create_session_response resp
;
238 cmd
.cmd
= htobe32(LTTNG_VIEWER_CREATE_SESSION
);
239 cmd
.data_size
= htobe64(0);
240 cmd
.cmd_version
= htobe32(0);
242 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
244 diag("[error] Error sending cmd");
247 LTTNG_ASSERT(ret_len
== sizeof(cmd
));
249 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
251 diag("[error] Remote side has closed connection");
255 diag("[error] Error receiving create session reply");
258 LTTNG_ASSERT(ret_len
== sizeof(resp
));
260 if (be32toh(resp
.status
) != LTTNG_VIEWER_CREATE_SESSION_OK
) {
261 diag("[error] Error creating viewer session");
270 static int attach_session(uint64_t id
)
272 struct lttng_viewer_cmd cmd
;
273 struct lttng_viewer_attach_session_request rq
;
274 struct lttng_viewer_attach_session_response rp
;
275 struct lttng_viewer_stream stream
;
279 session
= zmalloc
<live_session
>();
284 cmd
.cmd
= htobe32(LTTNG_VIEWER_ATTACH_SESSION
);
285 cmd
.data_size
= htobe64(sizeof(rq
));
286 cmd
.cmd_version
= htobe32(0);
288 memset(&rq
, 0, sizeof(rq
));
289 rq
.session_id
= htobe64(id
);
290 rq
.seek
= htobe32(LTTNG_VIEWER_SEEK_BEGINNING
);
292 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
294 diag("Error sending cmd LTTNG_VIEWER_ATTACH_SESSION");
297 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
299 diag("Error sending attach request");
303 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
305 diag("[error] Remote side has closed connection");
309 diag("Error receiving attach response");
312 if (be32toh(rp
.status
) != LTTNG_VIEWER_ATTACH_OK
) {
316 session
->stream_count
= be32toh(rp
.streams_count
);
317 if (session
->stream_count
== 0) {
318 diag("Got session stream count == 0");
321 session
->streams
= calloc
<viewer_stream
>(session
->stream_count
);
322 if (!session
->streams
) {
326 for (i
= 0; i
< be32toh(rp
.streams_count
); i
++) {
327 ret_len
= lttng_live_recv(control_sock
, &stream
, sizeof(stream
));
329 diag("[error] Remote side has closed connection");
333 diag("Error receiving stream");
336 session
->streams
[i
].id
= be64toh(stream
.id
);
338 session
->streams
[i
].ctf_trace_id
= be64toh(stream
.ctf_trace_id
);
339 session
->streams
[i
].first_read
= 1;
340 session
->streams
[i
].mmap_base
= mmap(
341 NULL
, mmap_size
, PROT_READ
| PROT_WRITE
, MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0);
342 if (session
->streams
[i
].mmap_base
== MAP_FAILED
) {
347 if (be32toh(stream
.metadata_flag
)) {
348 session
->streams
[i
].metadata_flag
= 1;
351 return session
->stream_count
;
357 static int get_metadata(void)
359 struct lttng_viewer_cmd cmd
;
360 struct lttng_viewer_get_metadata rq
;
361 struct lttng_viewer_metadata_packet rp
;
367 int metadata_stream_id
= -1;
369 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_METADATA
);
370 cmd
.data_size
= htobe64(sizeof(rq
));
371 cmd
.cmd_version
= htobe32(0);
373 for (i
= 0; i
< session
->stream_count
; i
++) {
374 if (session
->streams
[i
].metadata_flag
) {
375 metadata_stream_id
= i
;
380 if (metadata_stream_id
< 0) {
381 diag("No metadata stream found");
385 rq
.stream_id
= htobe64(session
->streams
[metadata_stream_id
].id
);
388 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
390 diag("Error sending cmd");
393 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
395 diag("Error sending get_metadata request");
398 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
400 diag("[error] Remote side has closed connection");
404 diag("Error receiving metadata response");
407 switch (be32toh(rp
.status
)) {
408 case LTTNG_VIEWER_METADATA_OK
:
410 case LTTNG_VIEWER_NO_NEW_METADATA
:
411 diag("Got LTTNG_VIEWER_NO_NEW_METADATA:");
414 case LTTNG_VIEWER_METADATA_ERR
:
415 diag("Got LTTNG_VIEWER_METADATA_ERR:");
418 diag("Got unknown status during LTTNG_VIEWER_GET_METADATA");
422 len
= be64toh(rp
.len
);
427 data
= calloc
<char>(len
);
429 PERROR("relay data zmalloc");
432 ret_len
= lttng_live_recv(control_sock
, data
, len
);
434 diag("[error] Remote side has closed connection");
435 goto error_free_data
;
438 diag("Error receiving trace packet");
439 goto error_free_data
;
452 static int get_next_index(void)
454 struct lttng_viewer_cmd cmd
;
455 struct lttng_viewer_get_next_index rq
;
456 struct lttng_viewer_index rp
;
460 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_NEXT_INDEX
);
461 cmd
.data_size
= htobe64(sizeof(rq
));
462 cmd
.cmd_version
= htobe32(0);
464 for (id
= 0; id
< session
->stream_count
; id
++) {
465 if (session
->streams
[id
].metadata_flag
) {
468 memset(&rq
, 0, sizeof(rq
));
469 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
472 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
474 diag("Error sending cmd");
477 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
479 diag("Error sending get_next_index request");
482 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
484 diag("[error] Remote side has closed connection");
488 diag("Error receiving index response");
492 rp
.flags
= be32toh(rp
.flags
);
494 switch (be32toh(rp
.status
)) {
495 case LTTNG_VIEWER_INDEX_INACTIVE
:
496 /* Skip this stream. */
497 diag("Got LTTNG_VIEWER_INDEX_INACTIVE");
499 case LTTNG_VIEWER_INDEX_OK
:
501 case LTTNG_VIEWER_INDEX_RETRY
:
504 case LTTNG_VIEWER_INDEX_HUP
:
505 diag("Got LTTNG_VIEWER_INDEX_HUP");
506 session
->streams
[id
].id
= -1ULL;
507 session
->streams
[id
].fd
= -1;
509 case LTTNG_VIEWER_INDEX_ERR
:
510 diag("Got LTTNG_VIEWER_INDEX_ERR");
513 diag("Unknown reply status during LTTNG_VIEWER_GET_NEXT_INDEX (%d)",
517 if (first_packet_stream_id
< 0) {
519 * Initialize the first packet stream id. That is,
520 * the first active stream encoutered.
522 first_packet_offset
= be64toh(rp
.offset
);
523 first_packet_len
= be64toh(rp
.packet_size
) / CHAR_BIT
;
524 first_packet_stream_id
= id
;
525 diag("Got first packet index with offset %d and len %d",
536 static int get_data_packet(int id
, uint64_t offset
, uint64_t len
)
538 struct lttng_viewer_cmd cmd
;
539 struct lttng_viewer_get_packet rq
;
540 struct lttng_viewer_trace_packet rp
;
543 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_PACKET
);
544 cmd
.data_size
= htobe64(sizeof(rq
));
545 cmd
.cmd_version
= htobe32(0);
547 memset(&rq
, 0, sizeof(rq
));
548 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
549 /* Already in big endian. */
551 rq
.len
= htobe32(len
);
553 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
555 diag("Error sending cmd");
558 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
560 diag("Error sending get_data_packet request");
563 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
565 diag("[error] Remote side has closed connection");
569 diag("Error receiving data response");
572 rp
.flags
= be32toh(rp
.flags
);
574 switch (be32toh(rp
.status
)) {
575 case LTTNG_VIEWER_GET_PACKET_OK
:
576 len
= be32toh(rp
.len
);
578 diag("Got LTTNG_VIEWER_GET_PACKET_OK, but len == 0");
582 case LTTNG_VIEWER_GET_PACKET_RETRY
:
583 diag("Got LTTNG_VIEWER_GET_PACKET_RETRY:");
585 case LTTNG_VIEWER_GET_PACKET_ERR
:
586 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
587 diag("Got LTTNG_VIEWER_GET_PACKET_ERR with NEW_METADATA flag");
590 diag("Got LTTNG_VIEWER_GET_PACKET_ERR:");
593 diag("Got unknown status code during LTTNG_VIEWER_GET_PACKET");
597 if (len
> mmap_size
) {
598 diag("mmap_size not big enough");
602 ret_len
= lttng_live_recv(control_sock
, session
->streams
[id
].mmap_base
, len
);
604 diag("[error] Remote side has closed connection");
608 diag("Error receiving trace packet");
617 static int detach_viewer_session(uint64_t id
)
619 struct lttng_viewer_cmd cmd
;
620 struct lttng_viewer_detach_session_response resp
;
621 struct lttng_viewer_detach_session_request rq
;
625 cmd
.cmd
= htobe32(LTTNG_VIEWER_DETACH_SESSION
);
626 cmd
.data_size
= htobe64(sizeof(rq
));
627 cmd
.cmd_version
= htobe32(0);
629 memset(&rq
, 0, sizeof(rq
));
630 rq
.session_id
= htobe64(id
);
632 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
634 fprintf(stderr
, "[error] Error sending cmd\n");
639 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
641 fprintf(stderr
, "Error sending attach request\n");
646 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
648 fprintf(stderr
, "[error] Error receiving detach session reply\n");
653 if (be32toh(resp
.status
) != LTTNG_VIEWER_DETACH_SESSION_OK
) {
654 fprintf(stderr
, "[error] Error detaching viewer session\n");
669 plan_tests(NUM_TESTS
);
671 diag("Live unit tests");
673 ret
= connect_viewer("localhost");
674 ok(ret
== 0, "Connect viewer to relayd");
676 ret
= establish_connection();
678 "Established connection and version check with %d.%d",
682 ret
= list_sessions(&session_id
);
683 ok(ret
> 0, "List sessions : %d session(s)", ret
);
688 ret
= create_viewer_session();
689 ok(ret
== 0, "Create viewer session");
691 ret
= attach_session(session_id
);
692 ok(ret
> 0, "Attach to session, %d stream(s) received", ret
);
694 ret
= get_metadata();
695 ok(ret
> 0, "Get metadata, received %d bytes", ret
);
697 ret
= get_next_index();
698 ok(ret
== 0, "Get one index per stream");
700 ret
= get_data_packet(first_packet_stream_id
, first_packet_offset
, first_packet_len
);
702 "Get one data packet for stream %d, offset %d, len %d",
703 first_packet_stream_id
,
707 ret
= detach_viewer_session(session_id
);
708 ok(ret
== 0, "Detach viewer session");
710 ret
= list_sessions(&session_id
);
711 ok(ret
> 0, "List sessions : %d session(s)", ret
);
713 ret
= attach_session(session_id
);
714 ok(ret
> 0, "Attach to session, %d streams received", ret
);
716 return exit_status();