2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
37 #include "lttng-viewer.h"
38 #include "lttng-index.h"
39 #include "network-live.h"
41 #include <babeltrace/babeltrace.h>
42 #include <babeltrace/ctf/events.h>
43 #include <babeltrace/ctf/callbacks.h>
44 #include <babeltrace/ctf/iterator.h>
46 /* for packet_index */
47 #include <babeltrace/ctf/types.h>
49 #include <babeltrace/ctf/metadata.h>
50 #include <babeltrace/ctf-text/types.h>
51 #include <babeltrace/ctf/events-internal.h>
54 * Memory allocation zeroed
56 #define zmalloc(x) calloc(1, x)
57 /* FIXME : completely arbitrary */
58 #define mmap_size 524288
60 static int control_sock
;
61 struct live_session
*session
;
63 struct viewer_stream
{
65 uint64_t ctf_trace_id
;
74 struct viewer_stream
*streams
;
75 uint64_t live_timer_interval
;
76 uint64_t stream_count
;
80 int connect_viewer(char *hostname
)
83 struct sockaddr_in server_addr
;
86 host
= gethostbyname(hostname
);
92 if ((control_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
98 server_addr
.sin_family
= AF_INET
;
99 server_addr
.sin_port
= htons(5344);
100 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
101 bzero(&(server_addr
.sin_zero
), 8);
103 if (connect(control_sock
, (struct sockaddr
*) &server_addr
,
104 sizeof(struct sockaddr
)) == -1) {
110 server_addr
.sin_family
= AF_INET
;
111 server_addr
.sin_port
= htons(5345);
112 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
113 bzero(&(server_addr
.sin_zero
), 8);
122 int establish_connection(void)
124 struct lttng_viewer_cmd cmd
;
125 struct lttng_viewer_connect connect
;
128 cmd
.cmd
= htobe32(VIEWER_CONNECT
);
129 cmd
.data_size
= sizeof(connect
);
132 connect
.major
= htobe32(2);
133 connect
.minor
= htobe32(4);
134 connect
.type
= htobe32(VIEWER_CLIENT_COMMAND
);
137 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
138 } while (ret
< 0 && errno
== EINTR
);
140 fprintf(stderr
, "Error sending cmd\n");
144 ret
= send(control_sock
, &connect
, sizeof(connect
), 0);
145 } while (ret
< 0 && errno
== EINTR
);
147 fprintf(stderr
, "Error sending version\n");
152 ret
= recv(control_sock
, &connect
, sizeof(connect
), 0);
153 } while (ret
< 0 && errno
== EINTR
);
155 fprintf(stderr
, "Error receiving version\n");
158 fprintf(stderr
, " - Received viewer session ID : %" PRIu64
"\n",
159 be64toh(connect
.viewer_session_id
));
160 fprintf(stderr
, " - Received version : %u.%u\n", be32toh(connect
.major
),
161 be32toh(connect
.minor
));
169 int list_sessions(void)
171 struct lttng_viewer_cmd cmd
;
172 struct lttng_viewer_list_sessions list
;
173 struct lttng_viewer_session lsession
;
175 int first_session
= 0;
177 cmd
.cmd
= htobe32(VIEWER_LIST_SESSIONS
);
182 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
183 } while (ret
< 0 && errno
== EINTR
);
185 fprintf(stderr
, "Error sending cmd\n");
190 ret
= recv(control_sock
, &list
, sizeof(list
), 0);
191 } while (ret
< 0 && errno
== EINTR
);
193 fprintf(stderr
, "Error receiving session list\n");
197 fprintf(stderr
, " - %u active session(s)\n", be32toh(list
.sessions_count
));
198 for (i
= 0; i
< be32toh(list
.sessions_count
); i
++) {
200 ret
= recv(control_sock
, &lsession
, sizeof(lsession
), 0);
201 } while (ret
< 0 && errno
== EINTR
);
203 fprintf(stderr
, "Error receiving session\n");
206 fprintf(stderr
, " - %" PRIu64
" : %s on host %s (timer = %u, "
207 "%u client(s) connected)\n",
208 be64toh(lsession
.id
), lsession
.session_name
,
209 lsession
.hostname
, be32toh(lsession
.live_timer
),
210 be32toh(lsession
.clients
));
211 if (first_session
<= 0) {
212 first_session
= be64toh(lsession
.id
);
216 /* I know, type mismatch */
217 ret
= (int) first_session
;
223 int write_index_header(int fd
)
225 struct lttng_packet_index_file_hdr hdr
;
228 memcpy(hdr
.magic
, INDEX_MAGIC
, sizeof(hdr
.magic
));
229 hdr
.index_major
= htobe32(INDEX_MAJOR
);
230 hdr
.index_minor
= htobe32(INDEX_MINOR
);
233 ret
= write(fd
, &hdr
, sizeof(hdr
));
234 } while (ret
< 0 && errno
== EINTR
);
236 perror("write index header");
245 int attach_session(int id
, int begin
)
247 struct lttng_viewer_cmd cmd
;
248 struct lttng_viewer_attach_session_request rq
;
249 struct lttng_viewer_attach_session_response rp
;
250 struct lttng_viewer_stream stream
;
253 cmd
.cmd
= htobe32(VIEWER_ATTACH_SESSION
);
254 cmd
.data_size
= sizeof(rq
);
257 rq
.session_id
= htobe64(id
);
259 rq
.seek
= htobe32(VIEWER_SEEK_BEGINNING
);
261 rq
.seek
= htobe32(VIEWER_SEEK_LAST
);
265 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
266 } while (ret
< 0 && errno
== EINTR
);
268 fprintf(stderr
, "Error sending cmd\n");
272 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
273 } while (ret
< 0 && errno
== EINTR
);
275 fprintf(stderr
, "Error sending attach request\n");
280 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
281 } while (ret
< 0 && errno
== EINTR
);
283 fprintf(stderr
, "Error receiving attach response\n");
286 fprintf(stderr
, " - session attach response : %u\n", be32toh(rp
.status
));
287 if (be32toh(rp
.status
) != VIEWER_ATTACH_OK
) {
292 session
->stream_count
= be32toh(rp
.streams_count
);
293 fprintf(stderr
, " - Waiting for %" PRIu64
" streams\n", session
->stream_count
);
294 session
->streams
= zmalloc(session
->stream_count
*
295 sizeof(struct viewer_stream
));
296 if (!session
->streams
) {
301 for (i
= 0; i
< be32toh(rp
.streams_count
); i
++) {
303 ret
= recv(control_sock
, &stream
, sizeof(stream
), 0);
304 } while (ret
< 0 && errno
== EINTR
);
306 fprintf(stderr
, "Error receiving stream\n");
309 fprintf(stderr
, " - stream %" PRIu64
" : %s/%s\n",
310 be64toh(stream
.id
), stream
.path_name
,
311 stream
.channel_name
);
312 session
->streams
[i
].id
= be64toh(stream
.id
);
314 session
->streams
[i
].ctf_trace_id
= be64toh(stream
.ctf_trace_id
);
315 session
->streams
[i
].first_read
= 1;
316 session
->streams
[i
].mmap_base
= mmap(NULL
, mmap_size
, PROT_READ
| PROT_WRITE
,
317 MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0);
318 if (session
->streams
[i
].mmap_base
== MAP_FAILED
) {
319 fprintf(stderr
, "mmap error\n");
324 if (be32toh(stream
.metadata_flag
)) {
325 session
->streams
[i
].metadata_flag
= 1;
326 unlink("testlivetrace");
327 mkdir("testlivetrace", S_IRWXU
| S_IRWXG
);
328 snprintf(session
->streams
[i
].path
,
329 sizeof(session
->streams
[i
].path
),
331 stream
.channel_name
);
332 ret
= open(session
->streams
[i
].path
,
333 O_WRONLY
| O_CREAT
| O_TRUNC
,
334 S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
);
338 session
->streams
[i
].fd
= ret
;
351 void dump_packet_index(struct lttng_packet_index
*index
)
353 printf(" - index : %lu, %lu, %lu, %lu, %lu, %lu, %lu\n",
354 be64toh(index
->offset
),
355 be64toh(index
->packet_size
),
356 be64toh(index
->content_size
),
357 be64toh(index
->timestamp_begin
),
358 be64toh(index
->timestamp_end
),
359 be64toh(index
->events_discarded
),
360 be64toh(index
->stream_id
));
365 int get_data_packet(int id
, uint64_t offset
,
368 struct lttng_viewer_cmd cmd
;
369 struct lttng_viewer_get_packet rq
;
370 struct lttng_viewer_trace_packet rp
;
373 cmd
.cmd
= htobe32(VIEWER_GET_PACKET
);
374 cmd
.data_size
= sizeof(rq
);
377 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
378 /* Already in big endian. */
380 rq
.len
= htobe32(len
);
381 fprintf(stderr
, " - get_packet ");
384 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
385 } while (ret
< 0 && errno
== EINTR
);
387 fprintf(stderr
, "Error sending cmd\n");
391 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
392 } while (ret
< 0 && errno
== EINTR
);
394 fprintf(stderr
, "Error sending get_data_packet request\n");
398 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
399 } while (ret
< 0 && errno
== EINTR
);
401 fprintf(stderr
, "Error receiving data response\n");
404 rp
.flags
= be32toh(rp
.flags
);
406 switch (be32toh(rp
.status
)) {
407 case VIEWER_GET_PACKET_OK
:
408 fprintf(stderr
, "OK\n");
410 case VIEWER_GET_PACKET_RETRY
:
411 fprintf(stderr
, "RETRY\n");
414 case VIEWER_GET_PACKET_ERR
:
415 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
416 fprintf(stderr
, "NEW_METADATA\n");
420 fprintf(stderr
, "ERR\n");
424 fprintf(stderr
, "UNKNOWN\n");
429 len
= be32toh(rp
.len
);
430 fprintf(stderr
, " - writing %" PRIu64
" bytes to tracefile\n", len
);
435 if (len
> mmap_size
) {
436 fprintf(stderr
, "mmap_size not big enough\n");
442 ret
= recv(control_sock
, session
->streams
[id
].mmap_base
, len
, MSG_WAITALL
);
443 } while (ret
< 0 && errno
== EINTR
);
445 fprintf(stderr
, "Error receiving trace packet\n");
455 * Return number of metadata bytes written or a negative value on error.
458 int get_new_metadata(int id
)
460 struct lttng_viewer_cmd cmd
;
461 struct lttng_viewer_get_metadata rq
;
462 struct lttng_viewer_metadata_packet rp
;
467 int metadata_stream_id
= -1;
469 cmd
.cmd
= htobe32(VIEWER_GET_METADATA
);
470 cmd
.data_size
= sizeof(rq
);
473 /* find the metadata stream for this ctf_trace */
474 for (i
= 0; i
< session
->stream_count
; i
++) {
475 if (session
->streams
[i
].metadata_flag
&&
476 session
->streams
[i
].ctf_trace_id
==
477 session
->streams
[id
].ctf_trace_id
) {
478 metadata_stream_id
= i
;
482 if (metadata_stream_id
< 0) {
483 fprintf(stderr
, "No metadata stream found\n");
488 rq
.stream_id
= htobe64(session
->streams
[metadata_stream_id
].id
);
489 fprintf(stderr
, " - get_metadata ");
492 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
493 } while (ret
< 0 && errno
== EINTR
);
495 fprintf(stderr
, "Error sending cmd\n");
499 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
500 } while (ret
< 0 && errno
== EINTR
);
502 fprintf(stderr
, "Error sending get_metadata request\n");
506 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
507 } while (ret
< 0 && errno
== EINTR
);
509 fprintf(stderr
, "Error receiving metadata response\n");
512 switch (be32toh(rp
.status
)) {
513 case VIEWER_METADATA_OK
:
514 fprintf(stderr
, "OK\n");
516 case VIEWER_NO_NEW_METADATA
:
517 fprintf(stderr
, "NO NEW\n");
520 case VIEWER_METADATA_ERR
:
521 fprintf(stderr
, "ERR\n");
525 fprintf(stderr
, "UNKNOWN\n");
530 len
= be64toh(rp
.len
);
531 fprintf(stderr
, " - writing %" PRIu64
" bytes to metadata\n", len
);
538 perror("relay data zmalloc");
542 ret
= recv(control_sock
, data
, len
, MSG_WAITALL
);
543 } while (ret
< 0 && errno
== EINTR
);
545 fprintf(stderr
, "Error receiving trace packet\n");
550 ret
= write(session
->streams
[metadata_stream_id
].fd
, data
, len
);
551 } while (ret
< 0 && errno
== EINTR
);
566 * Get one index for a stream.
568 int get_next_index(int id
, struct packet_index
*index
)
570 struct lttng_viewer_cmd cmd
;
571 struct lttng_viewer_get_next_index rq
;
572 struct lttng_viewer_index rp
;
575 cmd
.cmd
= htobe32(VIEWER_GET_NEXT_INDEX
);
576 cmd
.data_size
= sizeof(rq
);
579 fprintf(stderr
, " - get next index for stream %" PRIu64
"\n",
580 session
->streams
[id
].id
);
581 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
585 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
586 } while (ret
< 0 && errno
== EINTR
);
588 fprintf(stderr
, "Error sending cmd\n");
592 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
593 } while (ret
< 0 && errno
== EINTR
);
595 fprintf(stderr
, "Error sending get_next_index request\n");
599 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
600 } while (ret
< 0 && errno
== EINTR
);
602 fprintf(stderr
, "Error receiving index response\n");
605 fprintf(stderr
, " - reply : %u ", be32toh(rp
.status
));
607 rp
.flags
= be32toh(rp
.flags
);
609 switch (be32toh(rp
.status
)) {
610 case VIEWER_INDEX_INACTIVE
:
611 fprintf(stderr
, "(INACTIVE)\n");
612 memset(index
, 0, sizeof(struct packet_index
));
613 index
->timestamp_end
= be64toh(rp
.timestamp_end
);
615 case VIEWER_INDEX_OK
:
616 fprintf(stderr
, "(OK), need metadata update : %u\n",
617 rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
);
618 index
->offset
= be64toh(rp
.offset
);
619 index
->packet_size
= be64toh(rp
.packet_size
);
620 index
->content_size
= be64toh(rp
.content_size
);
621 index
->timestamp_begin
= be64toh(rp
.timestamp_begin
);
622 index
->timestamp_end
= be64toh(rp
.timestamp_end
);
623 index
->events_discarded
= be64toh(rp
.events_discarded
);
625 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
626 fprintf(stderr
, "NEW METADATA NEEDED\n");
627 ret
= get_new_metadata(id
);
633 case VIEWER_INDEX_RETRY
:
634 fprintf(stderr
, "(RETRY)\n");
637 case VIEWER_INDEX_HUP
:
638 fprintf(stderr
, "(HUP)\n");
639 session
->streams
[id
].id
= -1ULL;
640 session
->streams
[id
].fd
= -1;
642 case VIEWER_INDEX_ERR
:
643 fprintf(stderr
, "(ERR)\n");
647 fprintf(stderr
, "SHOULD NOT HAPPEN\n");
656 void ctf_live_packet_seek(struct bt_stream_pos
*stream_pos
, size_t index
,
659 struct ctf_stream_pos
*pos
;
660 struct ctf_file_stream
*file_stream
;
661 struct packet_index packet_index
;
664 pos
= ctf_pos(stream_pos
);
665 file_stream
= container_of(pos
, struct ctf_file_stream
, pos
);
667 fprintf(stderr
, "BT GET_NEXT_INDEX %d\n", pos
->fd
);
668 ret
= get_next_index(pos
->fd
, &packet_index
);
670 fprintf(stderr
, "get_next_index failed\n");
674 pos
->packet_size
= packet_index
.packet_size
;
675 pos
->content_size
= packet_index
.content_size
;
676 pos
->mmap_base_offset
= 0;
679 file_stream
->parent
.cycles_timestamp
= packet_index
.timestamp_end
;
680 file_stream
->parent
.real_timestamp
= ctf_get_real_timestamp(
681 &file_stream
->parent
, packet_index
.timestamp_end
);
683 if (pos
->packet_size
== 0) {
687 fprintf(stderr
, "BT GET_DATA_PACKET\n");
688 ret
= get_data_packet(pos
->fd
, be64toh(packet_index
.offset
),
689 packet_index
.packet_size
/ CHAR_BIT
);
691 fprintf(stderr
, "get_data_packet failed");
695 fprintf(stderr
, "BT MMAP %d\n", pos
->fd
);
696 fprintf(stderr
, "packet_size : %lu, offset %lu, content_size %lu, timestamp_end : %lu, real : %lu\n",
697 packet_index
.packet_size
,
699 packet_index
.content_size
,
700 packet_index
.timestamp_end
,
701 ctf_get_real_timestamp(
702 &file_stream
->parent
, packet_index
.timestamp_end
));
703 if (!pos
->base_mma
) {
704 pos
->base_mma
= zmalloc(sizeof(*pos
->base_mma
));
705 if (!pos
->base_mma
) {
706 fprintf(stderr
, "alloc pos->base_mma\n");
711 mmap_align_set_addr(pos
->base_mma
, session
->streams
[pos
->fd
].mmap_base
);
712 if (pos
->base_mma
== MAP_FAILED
) {
713 perror("Error mmaping");
717 /* update trace_packet_header and stream_packet_context */
718 if (pos
->prot
!= PROT_WRITE
&& file_stream
->parent
.trace_packet_header
) {
719 /* Read packet header */
720 ret
= generic_rw(&pos
->parent
, &file_stream
->parent
.trace_packet_header
->p
);
723 if (pos
->prot
!= PROT_WRITE
&& file_stream
->parent
.stream_packet_context
) {
724 /* Read packet context */
725 ret
= generic_rw(&pos
->parent
, &file_stream
->parent
.stream_packet_context
->p
);
733 int open_trace(struct bt_context
**bt_ctx
)
735 struct bt_mmap_stream
*new_mmap_stream
;
736 struct bt_mmap_stream_list mmap_list
;
737 FILE *metadata_fp
= NULL
;
741 *bt_ctx
= bt_context_create();
742 BT_INIT_LIST_HEAD(&mmap_list
.head
);
744 for (i
= 0; i
< session
->stream_count
; i
++) {
745 int total_metadata
= 0;
747 if (!session
->streams
[i
].metadata_flag
) {
748 new_mmap_stream
= zmalloc(sizeof(struct bt_mmap_stream
));
750 * The FD is unused when we handle manually the
751 * packet seek, so we store here the ID of the
752 * stream in our stream list to be able to use it
755 new_mmap_stream
->fd
= i
;
756 bt_list_add(&new_mmap_stream
->list
, &mmap_list
.head
);
758 /* Get all possible metadata before starting */
760 ret
= get_new_metadata(i
);
762 total_metadata
+= ret
;
764 } while (ret
> 0 || total_metadata
== 0);
765 metadata_fp
= fopen(session
->streams
[i
].path
, "r");
770 fprintf(stderr
, "No metadata stream opened\n");
774 ret
= bt_context_add_trace(*bt_ctx
, NULL
, "ctf",
775 ctf_live_packet_seek
, &mmap_list
, metadata_fp
);
777 fprintf(stderr
, "Error adding trace\n");
782 begin_pos.type = BT_SEEK_BEGIN;
783 iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
784 while ((event = bt_ctf_iter_read_event(iter)) != NULL) {
786 ret = sout->parent.event_cb(&sout->parent, event->parent->stream);
788 fprintf(stderr, "[error] Writing event failed.\n");
793 ret = bt_iter_next(bt_ctf_get_iter(iter));
796 } else if (ret == EAGAIN) {
808 int setup_network_live(char *hostname
, int begin
)
813 session
= zmalloc(sizeof(struct live_session
));
818 ret
= connect_viewer(hostname
);
822 fprintf(stderr
, "* Connected\n");
824 fprintf(stderr
, "* Establish connection and version check\n");
825 ret
= establish_connection();
830 fprintf(stderr
, "* List sessions\n");
831 ret
= list_sessions();
833 fprintf(stderr
, "* List error\n");
835 } else if (ret
== 0) {
836 fprintf(stderr
, "* No session to attach to, exiting\n");
843 fprintf(stderr
, "* Attach session %d\n", ret
);
844 ret
= attach_session(session_id
, begin
);
848 } while (session
->stream_count
== 0);
854 free(session
->streams
);
855 fprintf(stderr
, "* Exiting %d\n", ret
);