2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
37 #include "lttng-viewer.h"
38 #include "lttng-index.h"
39 #include "network-live.h"
41 #include <babeltrace/babeltrace.h>
42 #include <babeltrace/ctf/events.h>
43 #include <babeltrace/ctf/callbacks.h>
44 #include <babeltrace/ctf/iterator.h>
46 /* for packet_index */
47 #include <babeltrace/ctf/types.h>
49 #include <babeltrace/ctf/metadata.h>
50 #include <babeltrace/ctf-text/types.h>
51 #include <babeltrace/ctf/events-internal.h>
54 * Memory allocation zeroed
56 #define zmalloc(x) calloc(1, x)
57 /* FIXME : completely arbitrary */
58 #define mmap_size 524288
60 static int control_sock
;
61 struct live_session
*session
;
63 struct viewer_stream
{
65 uint64_t ctf_trace_id
;
74 struct viewer_stream
*streams
;
75 uint64_t live_timer_interval
;
76 uint64_t stream_count
;
80 int connect_viewer(char *hostname
)
83 struct sockaddr_in server_addr
;
86 host
= gethostbyname(hostname
);
92 if ((control_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
98 server_addr
.sin_family
= AF_INET
;
99 server_addr
.sin_port
= htons(5344);
100 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
101 bzero(&(server_addr
.sin_zero
), 8);
103 if (connect(control_sock
, (struct sockaddr
*) &server_addr
,
104 sizeof(struct sockaddr
)) == -1) {
110 server_addr
.sin_family
= AF_INET
;
111 server_addr
.sin_port
= htons(5345);
112 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
113 bzero(&(server_addr
.sin_zero
), 8);
122 int establish_connection(void)
124 struct lttng_viewer_cmd cmd
;
125 struct lttng_viewer_connect connect
;
128 cmd
.cmd
= htobe32(VIEWER_CONNECT
);
129 cmd
.data_size
= sizeof(connect
);
132 connect
.major
= htobe32(2);
133 connect
.minor
= htobe32(4);
134 connect
.type
= htobe32(VIEWER_CLIENT_COMMAND
);
137 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
138 } while (ret
< 0 && errno
== EINTR
);
140 fprintf(stderr
, "Error sending cmd\n");
144 ret
= send(control_sock
, &connect
, sizeof(connect
), 0);
145 } while (ret
< 0 && errno
== EINTR
);
147 fprintf(stderr
, "Error sending version\n");
152 ret
= recv(control_sock
, &connect
, sizeof(connect
), 0);
153 } while (ret
< 0 && errno
== EINTR
);
155 fprintf(stderr
, "Error receiving version\n");
158 fprintf(stderr
, " - Received viewer session ID : %" PRIu64
"\n",
159 be64toh(connect
.viewer_session_id
));
160 fprintf(stderr
, " - Received version : %u.%u\n", be32toh(connect
.major
),
161 be32toh(connect
.minor
));
169 int list_sessions(void)
171 struct lttng_viewer_cmd cmd
;
172 struct lttng_viewer_list_sessions list
;
173 struct lttng_viewer_session lsession
;
175 int first_session
= 0;
177 cmd
.cmd
= htobe32(VIEWER_LIST_SESSIONS
);
182 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
183 } while (ret
< 0 && errno
== EINTR
);
185 fprintf(stderr
, "Error sending cmd\n");
190 ret
= recv(control_sock
, &list
, sizeof(list
), 0);
191 } while (ret
< 0 && errno
== EINTR
);
193 fprintf(stderr
, "Error receiving session list\n");
197 fprintf(stderr
, " - %u active session(s)\n", be32toh(list
.sessions_count
));
198 for (i
= 0; i
< be32toh(list
.sessions_count
); i
++) {
200 ret
= recv(control_sock
, &lsession
, sizeof(lsession
), 0);
201 } while (ret
< 0 && errno
== EINTR
);
203 fprintf(stderr
, "Error receiving session\n");
206 fprintf(stderr
, " - %" PRIu64
" : %s on host %s (timer = %u, "
207 "%u client(s) connected)\n",
208 be64toh(lsession
.id
), lsession
.session_name
,
209 lsession
.hostname
, be32toh(lsession
.live_timer
),
210 be32toh(lsession
.clients
));
211 if (first_session
<= 0) {
212 first_session
= be64toh(lsession
.id
);
216 /* I know, type mismatch */
217 ret
= (int) first_session
;
223 int write_index_header(int fd
)
225 struct lttng_packet_index_file_hdr hdr
;
228 memcpy(hdr
.magic
, INDEX_MAGIC
, sizeof(hdr
.magic
));
229 hdr
.index_major
= htobe32(INDEX_MAJOR
);
230 hdr
.index_minor
= htobe32(INDEX_MINOR
);
233 ret
= write(fd
, &hdr
, sizeof(hdr
));
234 } while (ret
< 0 && errno
== EINTR
);
236 perror("write index header");
244 int attach_session(int id
)
246 struct lttng_viewer_cmd cmd
;
247 struct lttng_viewer_attach_session_request rq
;
248 struct lttng_viewer_attach_session_response rp
;
249 struct lttng_viewer_stream stream
;
252 cmd
.cmd
= htobe32(VIEWER_ATTACH_SESSION
);
253 cmd
.data_size
= sizeof(rq
);
256 rq
.session_id
= htobe64(id
);
257 //rq.seek = htobe32(VIEWER_SEEK_BEGINNING);
258 rq
.seek
= htobe32(VIEWER_SEEK_LAST
);
261 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
262 } while (ret
< 0 && errno
== EINTR
);
264 fprintf(stderr
, "Error sending cmd\n");
268 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
269 } while (ret
< 0 && errno
== EINTR
);
271 fprintf(stderr
, "Error sending attach request\n");
276 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
277 } while (ret
< 0 && errno
== EINTR
);
279 fprintf(stderr
, "Error receiving attach response\n");
282 fprintf(stderr
, " - session attach response : %u\n", be32toh(rp
.status
));
283 if (be32toh(rp
.status
) != VIEWER_ATTACH_OK
) {
288 session
->stream_count
= be32toh(rp
.streams_count
);
289 fprintf(stderr
, " - Waiting for %" PRIu64
" streams\n", session
->stream_count
);
290 session
->streams
= zmalloc(session
->stream_count
*
291 sizeof(struct viewer_stream
));
292 if (!session
->streams
) {
297 for (i
= 0; i
< be32toh(rp
.streams_count
); i
++) {
299 ret
= recv(control_sock
, &stream
, sizeof(stream
), 0);
300 } while (ret
< 0 && errno
== EINTR
);
302 fprintf(stderr
, "Error receiving stream\n");
305 fprintf(stderr
, " - stream %" PRIu64
" : %s/%s\n",
306 be64toh(stream
.id
), stream
.path_name
,
307 stream
.channel_name
);
308 session
->streams
[i
].id
= be64toh(stream
.id
);
310 session
->streams
[i
].ctf_trace_id
= be64toh(stream
.ctf_trace_id
);
311 session
->streams
[i
].first_read
= 1;
312 session
->streams
[i
].mmap_base
= mmap(NULL
, mmap_size
, PROT_READ
| PROT_WRITE
,
313 MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0);
314 if (session
->streams
[i
].mmap_base
== MAP_FAILED
) {
315 fprintf(stderr
, "mmap error\n");
320 if (be32toh(stream
.metadata_flag
)) {
321 session
->streams
[i
].metadata_flag
= 1;
322 unlink("testlivetrace");
323 mkdir("testlivetrace", S_IRWXU
| S_IRWXG
);
324 snprintf(session
->streams
[i
].path
,
325 sizeof(session
->streams
[i
].path
),
327 stream
.channel_name
);
328 ret
= open(session
->streams
[i
].path
,
329 O_WRONLY
| O_CREAT
| O_TRUNC
,
330 S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
);
334 session
->streams
[i
].fd
= ret
;
347 void dump_packet_index(struct lttng_packet_index
*index
)
349 printf(" - index : %lu, %lu, %lu, %lu, %lu, %lu, %lu\n",
350 be64toh(index
->offset
),
351 be64toh(index
->packet_size
),
352 be64toh(index
->content_size
),
353 be64toh(index
->timestamp_begin
),
354 be64toh(index
->timestamp_end
),
355 be64toh(index
->events_discarded
),
356 be64toh(index
->stream_id
));
361 int get_data_packet(int id
, uint64_t offset
,
364 struct lttng_viewer_cmd cmd
;
365 struct lttng_viewer_get_packet rq
;
366 struct lttng_viewer_trace_packet rp
;
369 cmd
.cmd
= htobe32(VIEWER_GET_PACKET
);
370 cmd
.data_size
= sizeof(rq
);
373 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
374 /* Already in big endian. */
376 rq
.len
= htobe32(len
);
377 fprintf(stderr
, " - get_packet ");
380 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
381 } while (ret
< 0 && errno
== EINTR
);
383 fprintf(stderr
, "Error sending cmd\n");
387 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
388 } while (ret
< 0 && errno
== EINTR
);
390 fprintf(stderr
, "Error sending get_data_packet request\n");
394 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
395 } while (ret
< 0 && errno
== EINTR
);
397 fprintf(stderr
, "Error receiving data response\n");
400 rp
.flags
= be32toh(rp
.flags
);
402 switch (be32toh(rp
.status
)) {
403 case VIEWER_GET_PACKET_OK
:
404 fprintf(stderr
, "OK\n");
406 case VIEWER_GET_PACKET_RETRY
:
407 fprintf(stderr
, "RETRY\n");
410 case VIEWER_GET_PACKET_ERR
:
411 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
412 fprintf(stderr
, "NEW_METADATA\n");
416 fprintf(stderr
, "ERR\n");
420 fprintf(stderr
, "UNKNOWN\n");
425 len
= be32toh(rp
.len
);
426 fprintf(stderr
, " - writing %" PRIu64
" bytes to tracefile\n", len
);
431 if (len
> mmap_size
) {
432 fprintf(stderr
, "mmap_size not big enough\n");
438 ret
= recv(control_sock
, session
->streams
[id
].mmap_base
, len
, MSG_WAITALL
);
439 } while (ret
< 0 && errno
== EINTR
);
441 fprintf(stderr
, "Error receiving trace packet\n");
451 * Return number of metadata bytes written or a negative value on error.
454 int get_new_metadata(int id
)
456 struct lttng_viewer_cmd cmd
;
457 struct lttng_viewer_get_metadata rq
;
458 struct lttng_viewer_metadata_packet rp
;
463 int metadata_stream_id
= -1;
465 cmd
.cmd
= htobe32(VIEWER_GET_METADATA
);
466 cmd
.data_size
= sizeof(rq
);
469 /* find the metadata stream for this ctf_trace */
470 for (i
= 0; i
< session
->stream_count
; i
++) {
471 if (session
->streams
[i
].metadata_flag
&&
472 session
->streams
[i
].ctf_trace_id
==
473 session
->streams
[id
].ctf_trace_id
) {
474 metadata_stream_id
= i
;
478 if (metadata_stream_id
< 0) {
479 fprintf(stderr
, "No metadata stream found\n");
484 rq
.stream_id
= htobe64(session
->streams
[metadata_stream_id
].id
);
485 fprintf(stderr
, " - get_metadata ");
488 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
489 } while (ret
< 0 && errno
== EINTR
);
491 fprintf(stderr
, "Error sending cmd\n");
495 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
496 } while (ret
< 0 && errno
== EINTR
);
498 fprintf(stderr
, "Error sending get_metadata request\n");
502 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
503 } while (ret
< 0 && errno
== EINTR
);
505 fprintf(stderr
, "Error receiving metadata response\n");
508 switch (be32toh(rp
.status
)) {
509 case VIEWER_METADATA_OK
:
510 fprintf(stderr
, "OK\n");
512 case VIEWER_NO_NEW_METADATA
:
513 fprintf(stderr
, "NO NEW\n");
516 case VIEWER_METADATA_ERR
:
517 fprintf(stderr
, "ERR\n");
521 fprintf(stderr
, "UNKNOWN\n");
526 len
= be64toh(rp
.len
);
527 fprintf(stderr
, " - writing %" PRIu64
" bytes to metadata\n", len
);
534 perror("relay data zmalloc");
538 ret
= recv(control_sock
, data
, len
, MSG_WAITALL
);
539 } while (ret
< 0 && errno
== EINTR
);
541 fprintf(stderr
, "Error receiving trace packet\n");
546 ret
= write(session
->streams
[metadata_stream_id
].fd
, data
, len
);
547 } while (ret
< 0 && errno
== EINTR
);
562 * Get one index for a stream.
564 int get_next_index(int id
, struct packet_index
*index
)
566 struct lttng_viewer_cmd cmd
;
567 struct lttng_viewer_get_next_index rq
;
568 struct lttng_viewer_index rp
;
571 cmd
.cmd
= htobe32(VIEWER_GET_NEXT_INDEX
);
572 cmd
.data_size
= sizeof(rq
);
575 fprintf(stderr
, " - get next index for stream %" PRIu64
"\n",
576 session
->streams
[id
].id
);
577 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
581 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
582 } while (ret
< 0 && errno
== EINTR
);
584 fprintf(stderr
, "Error sending cmd\n");
588 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
589 } while (ret
< 0 && errno
== EINTR
);
591 fprintf(stderr
, "Error sending get_next_index request\n");
595 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
596 } while (ret
< 0 && errno
== EINTR
);
598 fprintf(stderr
, "Error receiving index response\n");
601 fprintf(stderr
, " - reply : %u ", be32toh(rp
.status
));
603 rp
.flags
= be32toh(rp
.flags
);
605 switch (be32toh(rp
.status
)) {
606 case VIEWER_INDEX_INACTIVE
:
607 fprintf(stderr
, "(INACTIVE)\n");
608 memset(index
, 0, sizeof(struct packet_index
));
609 index
->timestamp_end
= be64toh(rp
.timestamp_end
);
611 case VIEWER_INDEX_OK
:
612 fprintf(stderr
, "(OK), need metadata update : %u\n",
613 rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
);
614 index
->offset
= be64toh(rp
.offset
);
615 index
->packet_size
= be64toh(rp
.packet_size
);
616 index
->content_size
= be64toh(rp
.content_size
);
617 index
->timestamp_begin
= be64toh(rp
.timestamp_begin
);
618 index
->timestamp_end
= be64toh(rp
.timestamp_end
);
619 index
->events_discarded
= be64toh(rp
.events_discarded
);
621 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
622 fprintf(stderr
, "NEW METADATA NEEDED\n");
623 ret
= get_new_metadata(id
);
629 case VIEWER_INDEX_RETRY
:
630 fprintf(stderr
, "(RETRY)\n");
633 case VIEWER_INDEX_HUP
:
634 fprintf(stderr
, "(HUP)\n");
635 session
->streams
[id
].id
= -1ULL;
636 session
->streams
[id
].fd
= -1;
638 case VIEWER_INDEX_ERR
:
639 fprintf(stderr
, "(ERR)\n");
643 fprintf(stderr
, "SHOULD NOT HAPPEN\n");
652 void ctf_live_packet_seek(struct bt_stream_pos
*stream_pos
, size_t index
,
655 struct ctf_stream_pos
*pos
;
656 struct ctf_file_stream
*file_stream
;
657 struct packet_index packet_index
;
660 pos
= ctf_pos(stream_pos
);
661 file_stream
= container_of(pos
, struct ctf_file_stream
, pos
);
663 fprintf(stderr
, "BT GET_NEXT_INDEX %d\n", pos
->fd
);
664 ret
= get_next_index(pos
->fd
, &packet_index
);
666 fprintf(stderr
, "get_next_index failed\n");
670 pos
->packet_size
= packet_index
.packet_size
;
671 pos
->content_size
= packet_index
.content_size
;
672 pos
->mmap_base_offset
= 0;
675 file_stream
->parent
.cycles_timestamp
= packet_index
.timestamp_end
;
676 file_stream
->parent
.real_timestamp
= ctf_get_real_timestamp(
677 &file_stream
->parent
, packet_index
.timestamp_end
);
679 if (pos
->packet_size
== 0) {
683 fprintf(stderr
, "BT GET_DATA_PACKET\n");
684 ret
= get_data_packet(pos
->fd
, be64toh(packet_index
.offset
),
685 packet_index
.packet_size
/ CHAR_BIT
);
687 fprintf(stderr
, "get_data_packet failed");
691 fprintf(stderr
, "BT MMAP %d\n", pos
->fd
);
692 fprintf(stderr
, "packet_size : %lu, offset %lu, content_size %lu, timestamp_end : %lu, real : %lu\n",
693 packet_index
.packet_size
,
695 packet_index
.content_size
,
696 packet_index
.timestamp_end
,
697 ctf_get_real_timestamp(
698 &file_stream
->parent
, packet_index
.timestamp_end
));
699 if (!pos
->base_mma
) {
700 pos
->base_mma
= zmalloc(sizeof(*pos
->base_mma
));
701 if (!pos
->base_mma
) {
702 fprintf(stderr
, "alloc pos->base_mma\n");
707 mmap_align_set_addr(pos
->base_mma
, session
->streams
[pos
->fd
].mmap_base
);
708 if (pos
->base_mma
== MAP_FAILED
) {
709 perror("Error mmaping");
713 /* update trace_packet_header and stream_packet_context */
714 if (pos
->prot
!= PROT_WRITE
&& file_stream
->parent
.trace_packet_header
) {
715 /* Read packet header */
716 ret
= generic_rw(&pos
->parent
, &file_stream
->parent
.trace_packet_header
->p
);
719 if (pos
->prot
!= PROT_WRITE
&& file_stream
->parent
.stream_packet_context
) {
720 /* Read packet context */
721 ret
= generic_rw(&pos
->parent
, &file_stream
->parent
.stream_packet_context
->p
);
729 int open_trace(struct bt_context
**bt_ctx
)
731 struct bt_mmap_stream
*new_mmap_stream
;
732 struct bt_mmap_stream_list mmap_list
;
733 FILE *metadata_fp
= NULL
;
737 *bt_ctx
= bt_context_create();
738 BT_INIT_LIST_HEAD(&mmap_list
.head
);
740 for (i
= 0; i
< session
->stream_count
; i
++) {
741 int total_metadata
= 0;
743 if (!session
->streams
[i
].metadata_flag
) {
744 new_mmap_stream
= zmalloc(sizeof(struct bt_mmap_stream
));
746 * The FD is unused when we handle manually the
747 * packet seek, so we store here the ID of the
748 * stream in our stream list to be able to use it
751 new_mmap_stream
->fd
= i
;
752 bt_list_add(&new_mmap_stream
->list
, &mmap_list
.head
);
754 /* Get all possible metadata before starting */
756 ret
= get_new_metadata(i
);
758 total_metadata
+= ret
;
760 } while (ret
> 0 || total_metadata
== 0);
761 metadata_fp
= fopen(session
->streams
[i
].path
, "r");
766 fprintf(stderr
, "No metadata stream opened\n");
770 ret
= bt_context_add_trace(*bt_ctx
, NULL
, "ctf",
771 ctf_live_packet_seek
, &mmap_list
, metadata_fp
);
773 fprintf(stderr
, "Error adding trace\n");
778 begin_pos.type = BT_SEEK_BEGIN;
779 iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
780 while ((event = bt_ctf_iter_read_event(iter)) != NULL) {
782 ret = sout->parent.event_cb(&sout->parent, event->parent->stream);
784 fprintf(stderr, "[error] Writing event failed.\n");
789 ret = bt_iter_next(bt_ctf_get_iter(iter));
792 } else if (ret == EAGAIN) {
804 int setup_network_live(char *hostname
)
809 session
= zmalloc(sizeof(struct live_session
));
814 ret
= connect_viewer(hostname
);
818 fprintf(stderr
, "* Connected\n");
820 fprintf(stderr
, "* Establish connection and version check\n");
821 ret
= establish_connection();
826 fprintf(stderr
, "* List sessions\n");
827 ret
= list_sessions();
829 fprintf(stderr
, "* List error\n");
831 } else if (ret
== 0) {
832 fprintf(stderr
, "* No session to attach to, exiting\n");
839 fprintf(stderr
, "* Attach session %d\n", ret
);
840 ret
= attach_session(session_id
);
844 } while (session
->stream_count
== 0);
850 free(session
->streams
);
851 fprintf(stderr
, "* Exiting %d\n", ret
);