Big live update, see details
[lttngtop.git] / src / network-live.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include "lttng-viewer.h"
38 #include "lttng-index.h"
39 #include "network-live.h"
40
41 #include <babeltrace/babeltrace.h>
42 #include <babeltrace/ctf/events.h>
43 #include <babeltrace/ctf/callbacks.h>
44 #include <babeltrace/ctf/iterator.h>
45
46 /* for packet_index */
47 #include <babeltrace/ctf/types.h>
48
49 #include <babeltrace/ctf/metadata.h>
50 #include <babeltrace/ctf-text/types.h>
51 #include <babeltrace/ctf/events-internal.h>
52
53 /*
54 * Memory allocation zeroed
55 */
56 #define zmalloc(x) calloc(1, x)
57 /* FIXME : completely arbitrary */
58 #define mmap_size 524288
59
60 static int control_sock;
61 struct live_session *session;
62
63 struct viewer_stream {
64 uint64_t id;
65 uint64_t ctf_trace_id;
66 void *mmap_base;
67 int fd;
68 int metadata_flag;
69 int first_read;
70 char path[PATH_MAX];
71 };
72
73 struct live_session {
74 struct viewer_stream *streams;
75 uint64_t live_timer_interval;
76 uint64_t stream_count;
77 };
78
79 static
80 int connect_viewer(char *hostname)
81 {
82 struct hostent *host;
83 struct sockaddr_in server_addr;
84 int ret;
85
86 host = gethostbyname(hostname);
87 if (!host) {
88 ret = -1;
89 goto end;
90 }
91
92 if ((control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
93 perror("Socket");
94 ret = -1;
95 goto end;
96 }
97
98 server_addr.sin_family = AF_INET;
99 server_addr.sin_port = htons(5344);
100 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
101 bzero(&(server_addr.sin_zero), 8);
102
103 if (connect(control_sock, (struct sockaddr *) &server_addr,
104 sizeof(struct sockaddr)) == -1) {
105 perror("Connect");
106 ret = -1;
107 goto end;
108 }
109
110 server_addr.sin_family = AF_INET;
111 server_addr.sin_port = htons(5345);
112 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
113 bzero(&(server_addr.sin_zero), 8);
114
115 ret = 0;
116
117 end:
118 return ret;
119 }
120
121 static
122 int establish_connection(void)
123 {
124 struct lttng_viewer_cmd cmd;
125 struct lttng_viewer_connect connect;
126 int ret;
127
128 cmd.cmd = htobe32(VIEWER_CONNECT);
129 cmd.data_size = sizeof(connect);
130 cmd.cmd_version = 0;
131
132 connect.major = htobe32(2);
133 connect.minor = htobe32(4);
134 connect.type = htobe32(VIEWER_CLIENT_COMMAND);
135
136 do {
137 ret = send(control_sock, &cmd, sizeof(cmd), 0);
138 } while (ret < 0 && errno == EINTR);
139 if (ret < 0) {
140 fprintf(stderr, "Error sending cmd\n");
141 goto error;
142 }
143 do {
144 ret = send(control_sock, &connect, sizeof(connect), 0);
145 } while (ret < 0 && errno == EINTR);
146 if (ret < 0) {
147 fprintf(stderr, "Error sending version\n");
148 goto error;
149 }
150
151 do {
152 ret = recv(control_sock, &connect, sizeof(connect), 0);
153 } while (ret < 0 && errno == EINTR);
154 if (ret < 0) {
155 fprintf(stderr, "Error receiving version\n");
156 goto error;
157 }
158 fprintf(stderr, " - Received viewer session ID : %" PRIu64 "\n",
159 be64toh(connect.viewer_session_id));
160 fprintf(stderr, " - Received version : %u.%u\n", be32toh(connect.major),
161 be32toh(connect.minor));
162
163 ret = 0;
164
165 error:
166 return ret;
167 }
168
169 int list_sessions(void)
170 {
171 struct lttng_viewer_cmd cmd;
172 struct lttng_viewer_list_sessions list;
173 struct lttng_viewer_session lsession;
174 int i, ret;
175 int first_session = 0;
176
177 cmd.cmd = htobe32(VIEWER_LIST_SESSIONS);
178 cmd.data_size = 0;
179 cmd.cmd_version = 0;
180
181 do {
182 ret = send(control_sock, &cmd, sizeof(cmd), 0);
183 } while (ret < 0 && errno == EINTR);
184 if (ret < 0) {
185 fprintf(stderr, "Error sending cmd\n");
186 goto error;
187 }
188
189 do {
190 ret = recv(control_sock, &list, sizeof(list), 0);
191 } while (ret < 0 && errno == EINTR);
192 if (ret < 0) {
193 fprintf(stderr, "Error receiving session list\n");
194 goto error;
195 }
196
197 fprintf(stderr, " - %u active session(s)\n", be32toh(list.sessions_count));
198 for (i = 0; i < be32toh(list.sessions_count); i++) {
199 do {
200 ret = recv(control_sock, &lsession, sizeof(lsession), 0);
201 } while (ret < 0 && errno == EINTR);
202 if (ret < 0) {
203 fprintf(stderr, "Error receiving session\n");
204 goto error;
205 }
206 fprintf(stderr, " - %" PRIu64 " : %s on host %s (timer = %u, "
207 "%u client(s) connected)\n",
208 be64toh(lsession.id), lsession.session_name,
209 lsession.hostname, be32toh(lsession.live_timer),
210 be32toh(lsession.clients));
211 if (first_session <= 0) {
212 first_session = be64toh(lsession.id);
213 }
214 }
215
216 /* I know, type mismatch */
217 ret = (int) first_session;
218
219 error:
220 return ret;
221 }
222
223 int write_index_header(int fd)
224 {
225 struct lttng_packet_index_file_hdr hdr;
226 int ret;
227
228 memcpy(hdr.magic, INDEX_MAGIC, sizeof(hdr.magic));
229 hdr.index_major = htobe32(INDEX_MAJOR);
230 hdr.index_minor = htobe32(INDEX_MINOR);
231
232 do {
233 ret = write(fd, &hdr, sizeof(hdr));
234 } while (ret < 0 && errno == EINTR);
235 if (ret < 0) {
236 perror("write index header");
237 goto error;
238 }
239
240 error:
241 return ret;
242 }
243
244 int attach_session(int id)
245 {
246 struct lttng_viewer_cmd cmd;
247 struct lttng_viewer_attach_session_request rq;
248 struct lttng_viewer_attach_session_response rp;
249 struct lttng_viewer_stream stream;
250 int ret, i;
251
252 cmd.cmd = htobe32(VIEWER_ATTACH_SESSION);
253 cmd.data_size = sizeof(rq);
254 cmd.cmd_version = 0;
255
256 rq.session_id = htobe64(id);
257 //rq.seek = htobe32(VIEWER_SEEK_BEGINNING);
258 rq.seek = htobe32(VIEWER_SEEK_LAST);
259
260 do {
261 ret = send(control_sock, &cmd, sizeof(cmd), 0);
262 } while (ret < 0 && errno == EINTR);
263 if (ret < 0) {
264 fprintf(stderr, "Error sending cmd\n");
265 goto error;
266 }
267 do {
268 ret = send(control_sock, &rq, sizeof(rq), 0);
269 } while (ret < 0 && errno == EINTR);
270 if (ret < 0) {
271 fprintf(stderr, "Error sending attach request\n");
272 goto error;
273 }
274
275 do {
276 ret = recv(control_sock, &rp, sizeof(rp), 0);
277 } while (ret < 0 && errno == EINTR);
278 if (ret < 0) {
279 fprintf(stderr, "Error receiving attach response\n");
280 goto error;
281 }
282 fprintf(stderr, " - session attach response : %u\n", be32toh(rp.status));
283 if (be32toh(rp.status) != VIEWER_ATTACH_OK) {
284 ret = 1;
285 goto end;
286 }
287
288 session->stream_count = be32toh(rp.streams_count);
289 fprintf(stderr, " - Waiting for %" PRIu64 " streams\n", session->stream_count);
290 session->streams = zmalloc(session->stream_count *
291 sizeof(struct viewer_stream));
292 if (!session->streams) {
293 ret = -1;
294 goto error;
295 }
296
297 for (i = 0; i < be32toh(rp.streams_count); i++) {
298 do {
299 ret = recv(control_sock, &stream, sizeof(stream), 0);
300 } while (ret < 0 && errno == EINTR);
301 if (ret < 0) {
302 fprintf(stderr, "Error receiving stream\n");
303 goto error;
304 }
305 fprintf(stderr, " - stream %" PRIu64 " : %s/%s\n",
306 be64toh(stream.id), stream.path_name,
307 stream.channel_name);
308 session->streams[i].id = be64toh(stream.id);
309
310 session->streams[i].ctf_trace_id = be64toh(stream.ctf_trace_id);
311 session->streams[i].first_read = 1;
312 session->streams[i].mmap_base = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
313 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
314 if (session->streams[i].mmap_base == MAP_FAILED) {
315 fprintf(stderr, "mmap error\n");
316 ret = -1;
317 goto error;
318 }
319
320 if (be32toh(stream.metadata_flag)) {
321 session->streams[i].metadata_flag = 1;
322 unlink("testlivetrace");
323 mkdir("testlivetrace", S_IRWXU | S_IRWXG);
324 snprintf(session->streams[i].path,
325 sizeof(session->streams[i].path),
326 "testlivetrace/%s",
327 stream.channel_name);
328 ret = open(session->streams[i].path,
329 O_WRONLY | O_CREAT | O_TRUNC,
330 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
331 if (ret < 0) {
332 goto error;
333 }
334 session->streams[i].fd = ret;
335 }
336 }
337 ret = 0;
338
339 end:
340 error:
341 return ret;
342 }
343
344 #if 0
345 /* useful debug */
346 static
347 void dump_packet_index(struct lttng_packet_index *index)
348 {
349 printf(" - index : %lu, %lu, %lu, %lu, %lu, %lu, %lu\n",
350 be64toh(index->offset),
351 be64toh(index->packet_size),
352 be64toh(index->content_size),
353 be64toh(index->timestamp_begin),
354 be64toh(index->timestamp_end),
355 be64toh(index->events_discarded),
356 be64toh(index->stream_id));
357 }
358 #endif
359
360 static
361 int get_data_packet(int id, uint64_t offset,
362 uint64_t len)
363 {
364 struct lttng_viewer_cmd cmd;
365 struct lttng_viewer_get_packet rq;
366 struct lttng_viewer_trace_packet rp;
367 int ret;
368
369 cmd.cmd = htobe32(VIEWER_GET_PACKET);
370 cmd.data_size = sizeof(rq);
371 cmd.cmd_version = 0;
372
373 rq.stream_id = htobe64(session->streams[id].id);
374 /* Already in big endian. */
375 rq.offset = offset;
376 rq.len = htobe32(len);
377 fprintf(stderr, " - get_packet ");
378
379 do {
380 ret = send(control_sock, &cmd, sizeof(cmd), 0);
381 } while (ret < 0 && errno == EINTR);
382 if (ret < 0) {
383 fprintf(stderr, "Error sending cmd\n");
384 goto error;
385 }
386 do {
387 ret = send(control_sock, &rq, sizeof(rq), 0);
388 } while (ret < 0 && errno == EINTR);
389 if (ret < 0) {
390 fprintf(stderr, "Error sending get_data_packet request\n");
391 goto error;
392 }
393 do {
394 ret = recv(control_sock, &rp, sizeof(rp), 0);
395 } while (ret < 0 && errno == EINTR);
396 if (ret < 0) {
397 fprintf(stderr, "Error receiving data response\n");
398 goto error;
399 }
400 rp.flags = be32toh(rp.flags);
401
402 switch (be32toh(rp.status)) {
403 case VIEWER_GET_PACKET_OK:
404 fprintf(stderr, "OK\n");
405 break;
406 case VIEWER_GET_PACKET_RETRY:
407 fprintf(stderr, "RETRY\n");
408 ret = -1;
409 goto end;
410 case VIEWER_GET_PACKET_ERR:
411 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
412 fprintf(stderr, "NEW_METADATA\n");
413 ret = 0;
414 goto end;
415 }
416 fprintf(stderr, "ERR\n");
417 ret = -1;
418 goto end;
419 default:
420 fprintf(stderr, "UNKNOWN\n");
421 ret = -1;
422 goto end;
423 }
424
425 len = be32toh(rp.len);
426 fprintf(stderr, " - writing %" PRIu64" bytes to tracefile\n", len);
427 if (len <= 0) {
428 goto end;
429 }
430
431 if (len > mmap_size) {
432 fprintf(stderr, "mmap_size not big enough\n");
433 ret = -1;
434 goto error;
435 }
436
437 do {
438 ret = recv(control_sock, session->streams[id].mmap_base, len, MSG_WAITALL);
439 } while (ret < 0 && errno == EINTR);
440 if (ret < 0) {
441 fprintf(stderr, "Error receiving trace packet\n");
442 goto error;
443 }
444
445 end:
446 error:
447 return ret;
448 }
449
450 /*
451 * Return number of metadata bytes written or a negative value on error.
452 */
453 static
454 int get_new_metadata(int id)
455 {
456 struct lttng_viewer_cmd cmd;
457 struct lttng_viewer_get_metadata rq;
458 struct lttng_viewer_metadata_packet rp;
459 int ret;
460 uint64_t i;
461 char *data = NULL;
462 uint64_t len = 0;
463 int metadata_stream_id = -1;
464
465 cmd.cmd = htobe32(VIEWER_GET_METADATA);
466 cmd.data_size = sizeof(rq);
467 cmd.cmd_version = 0;
468
469 /* find the metadata stream for this ctf_trace */
470 for (i = 0; i < session->stream_count; i++) {
471 if (session->streams[i].metadata_flag &&
472 session->streams[i].ctf_trace_id ==
473 session->streams[id].ctf_trace_id) {
474 metadata_stream_id = i;
475 break;
476 }
477 }
478 if (metadata_stream_id < 0) {
479 fprintf(stderr, "No metadata stream found\n");
480 ret = -1;
481 goto error;
482 }
483
484 rq.stream_id = htobe64(session->streams[metadata_stream_id].id);
485 fprintf(stderr, " - get_metadata ");
486
487 do {
488 ret = send(control_sock, &cmd, sizeof(cmd), 0);
489 } while (ret < 0 && errno == EINTR);
490 if (ret < 0) {
491 fprintf(stderr, "Error sending cmd\n");
492 goto error;
493 }
494 do {
495 ret = send(control_sock, &rq, sizeof(rq), 0);
496 } while (ret < 0 && errno == EINTR);
497 if (ret < 0) {
498 fprintf(stderr, "Error sending get_metadata request\n");
499 goto error;
500 }
501 do {
502 ret = recv(control_sock, &rp, sizeof(rp), 0);
503 } while (ret < 0 && errno == EINTR);
504 if (ret < 0) {
505 fprintf(stderr, "Error receiving metadata response\n");
506 goto error;
507 }
508 switch (be32toh(rp.status)) {
509 case VIEWER_METADATA_OK:
510 fprintf(stderr, "OK\n");
511 break;
512 case VIEWER_NO_NEW_METADATA:
513 fprintf(stderr, "NO NEW\n");
514 ret = -1;
515 goto end;
516 case VIEWER_METADATA_ERR:
517 fprintf(stderr, "ERR\n");
518 ret = -1;
519 goto end;
520 default:
521 fprintf(stderr, "UNKNOWN\n");
522 ret = -1;
523 goto end;
524 }
525
526 len = be64toh(rp.len);
527 fprintf(stderr, " - writing %" PRIu64" bytes to metadata\n", len);
528 if (len <= 0) {
529 goto end;
530 }
531
532 data = zmalloc(len);
533 if (!data) {
534 perror("relay data zmalloc");
535 goto error;
536 }
537 do {
538 ret = recv(control_sock, data, len, MSG_WAITALL);
539 } while (ret < 0 && errno == EINTR);
540 if (ret < 0) {
541 fprintf(stderr, "Error receiving trace packet\n");
542 free(data);
543 goto error;
544 }
545 do {
546 ret = write(session->streams[metadata_stream_id].fd, data, len);
547 } while (ret < 0 && errno == EINTR);
548 if (ret < 0) {
549 free(data);
550 goto error;
551 }
552 free(data);
553
554 /* FIXME : bad */
555 ret = (int) len;
556 end:
557 error:
558 return ret;
559 }
560
561 /*
562 * Get one index for a stream.
563 */
564 int get_next_index(int id, struct packet_index *index)
565 {
566 struct lttng_viewer_cmd cmd;
567 struct lttng_viewer_get_next_index rq;
568 struct lttng_viewer_index rp;
569 int ret;
570
571 cmd.cmd = htobe32(VIEWER_GET_NEXT_INDEX);
572 cmd.data_size = sizeof(rq);
573 cmd.cmd_version = 0;
574
575 fprintf(stderr, " - get next index for stream %" PRIu64 "\n",
576 session->streams[id].id);
577 rq.stream_id = htobe64(session->streams[id].id);
578
579 retry:
580 do {
581 ret = send(control_sock, &cmd, sizeof(cmd), 0);
582 } while (ret < 0 && errno == EINTR);
583 if (ret < 0) {
584 fprintf(stderr, "Error sending cmd\n");
585 goto error;
586 }
587 do {
588 ret = send(control_sock, &rq, sizeof(rq), 0);
589 } while (ret < 0 && errno == EINTR);
590 if (ret < 0) {
591 fprintf(stderr, "Error sending get_next_index request\n");
592 goto error;
593 }
594 do {
595 ret = recv(control_sock, &rp, sizeof(rp), 0);
596 } while (ret < 0 && errno == EINTR);
597 if (ret < 0) {
598 fprintf(stderr, "Error receiving index response\n");
599 goto error;
600 }
601 fprintf(stderr, " - reply : %u ", be32toh(rp.status));
602
603 rp.flags = be32toh(rp.flags);
604
605 switch (be32toh(rp.status)) {
606 case VIEWER_INDEX_INACTIVE:
607 fprintf(stderr, "(INACTIVE)\n");
608 memset(index, 0, sizeof(struct packet_index));
609 index->timestamp_end = be64toh(rp.timestamp_end);
610 break;
611 case VIEWER_INDEX_OK:
612 fprintf(stderr, "(OK), need metadata update : %u\n",
613 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
614 index->offset = be64toh(rp.offset);
615 index->packet_size = be64toh(rp.packet_size);
616 index->content_size = be64toh(rp.content_size);
617 index->timestamp_begin = be64toh(rp.timestamp_begin);
618 index->timestamp_end = be64toh(rp.timestamp_end);
619 index->events_discarded = be64toh(rp.events_discarded);
620
621 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
622 fprintf(stderr, "NEW METADATA NEEDED\n");
623 ret = get_new_metadata(id);
624 if (ret < 0) {
625 goto error;
626 }
627 }
628 break;
629 case VIEWER_INDEX_RETRY:
630 fprintf(stderr, "(RETRY)\n");
631 sleep(1);
632 goto retry;
633 case VIEWER_INDEX_HUP:
634 fprintf(stderr, "(HUP)\n");
635 session->streams[id].id = -1ULL;
636 session->streams[id].fd = -1;
637 break;
638 case VIEWER_INDEX_ERR:
639 fprintf(stderr, "(ERR)\n");
640 ret = -1;
641 goto error;
642 default:
643 fprintf(stderr, "SHOULD NOT HAPPEN\n");
644 ret = -1;
645 goto error;
646 }
647
648 error:
649 return ret;
650 }
651
652 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
653 int whence)
654 {
655 struct ctf_stream_pos *pos;
656 struct ctf_file_stream *file_stream;
657 struct packet_index packet_index;
658 int ret;
659
660 pos = ctf_pos(stream_pos);
661 file_stream = container_of(pos, struct ctf_file_stream, pos);
662
663 fprintf(stderr, "BT GET_NEXT_INDEX %d\n", pos->fd);
664 ret = get_next_index(pos->fd, &packet_index);
665 if (ret < 0) {
666 fprintf(stderr, "get_next_index failed\n");
667 return;
668 }
669
670 pos->packet_size = packet_index.packet_size;
671 pos->content_size = packet_index.content_size;
672 pos->mmap_base_offset = 0;
673 pos->offset = 0;
674
675 file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
676 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
677 &file_stream->parent, packet_index.timestamp_end);
678
679 if (pos->packet_size == 0) {
680 goto end;
681 }
682
683 fprintf(stderr, "BT GET_DATA_PACKET\n");
684 ret = get_data_packet(pos->fd, be64toh(packet_index.offset),
685 packet_index.packet_size / CHAR_BIT);
686 if (ret < 0) {
687 fprintf(stderr, "get_data_packet failed");
688 return;
689 }
690
691 fprintf(stderr, "BT MMAP %d\n", pos->fd);
692 fprintf(stderr, "packet_size : %lu, offset %lu, content_size %lu, timestamp_end : %lu, real : %lu\n",
693 packet_index.packet_size,
694 packet_index.offset,
695 packet_index.content_size,
696 packet_index.timestamp_end,
697 ctf_get_real_timestamp(
698 &file_stream->parent, packet_index.timestamp_end));
699 if (!pos->base_mma) {
700 pos->base_mma = zmalloc(sizeof(*pos->base_mma));
701 if (!pos->base_mma) {
702 fprintf(stderr, "alloc pos->base_mma\n");
703 return;
704 }
705 }
706
707 mmap_align_set_addr(pos->base_mma, session->streams[pos->fd].mmap_base);
708 if (pos->base_mma == MAP_FAILED) {
709 perror("Error mmaping");
710 return;
711 }
712
713 /* update trace_packet_header and stream_packet_context */
714 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
715 /* Read packet header */
716 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
717 assert(!ret);
718 }
719 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
720 /* Read packet context */
721 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
722 assert(!ret);
723 }
724
725 end:
726 return;
727 }
728
729 int open_trace(struct bt_context **bt_ctx)
730 {
731 struct bt_mmap_stream *new_mmap_stream;
732 struct bt_mmap_stream_list mmap_list;
733 FILE *metadata_fp = NULL;
734 int i;
735 int ret = 0;
736
737 *bt_ctx = bt_context_create();
738 BT_INIT_LIST_HEAD(&mmap_list.head);
739
740 for (i = 0; i < session->stream_count; i++) {
741 int total_metadata = 0;
742
743 if (!session->streams[i].metadata_flag) {
744 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
745 /*
746 * The FD is unused when we handle manually the
747 * packet seek, so we store here the ID of the
748 * stream in our stream list to be able to use it
749 * later.
750 */
751 new_mmap_stream->fd = i;
752 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
753 } else {
754 /* Get all possible metadata before starting */
755 do {
756 ret = get_new_metadata(i);
757 if (ret > 0) {
758 total_metadata += ret;
759 }
760 } while (ret > 0 || total_metadata == 0);
761 metadata_fp = fopen(session->streams[i].path, "r");
762 }
763 }
764
765 if (!metadata_fp) {
766 fprintf(stderr, "No metadata stream opened\n");
767 goto end;
768 }
769
770 ret = bt_context_add_trace(*bt_ctx, NULL, "ctf",
771 ctf_live_packet_seek, &mmap_list, metadata_fp);
772 if (ret < 0) {
773 fprintf(stderr, "Error adding trace\n");
774 goto end;
775 }
776
777 /*
778 begin_pos.type = BT_SEEK_BEGIN;
779 iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
780 while ((event = bt_ctf_iter_read_event(iter)) != NULL) {
781 if (!skip) {
782 ret = sout->parent.event_cb(&sout->parent, event->parent->stream);
783 if (ret) {
784 fprintf(stderr, "[error] Writing event failed.\n");
785 goto end;
786 }
787 }
788
789 ret = bt_iter_next(bt_ctf_get_iter(iter));
790 if (ret < 0) {
791 goto end;
792 } else if (ret == EAGAIN) {
793 skip = 1;
794 continue;
795 }
796 skip = 0;
797 }
798 */
799
800 end:
801 return ret;
802 }
803
804 int setup_network_live(char *hostname)
805 {
806 int ret;
807 int session_id;
808
809 session = zmalloc(sizeof(struct live_session));
810 if (!session) {
811 goto error;
812 }
813
814 ret = connect_viewer(hostname);
815 if (ret < 0) {
816 goto error;
817 }
818 fprintf(stderr, "* Connected\n");
819
820 fprintf(stderr, "* Establish connection and version check\n");
821 ret = establish_connection();
822 if (ret < 0) {
823 goto error;
824 }
825
826 fprintf(stderr, "* List sessions\n");
827 ret = list_sessions();
828 if (ret < 0) {
829 fprintf(stderr, "* List error\n");
830 goto error;
831 } else if (ret == 0) {
832 fprintf(stderr, "* No session to attach to, exiting\n");
833 ret = 0;
834 goto end;
835 }
836 session_id = ret;
837
838 do {
839 fprintf(stderr, "* Attach session %d\n", ret);
840 ret = attach_session(session_id);
841 if (ret < 0) {
842 goto error;
843 }
844 } while (session->stream_count == 0);
845
846 end:
847 return 0;
848
849 error:
850 free(session->streams);
851 fprintf(stderr, "* Exiting %d\n", ret);
852 return ret;
853 }
This page took 0.045325 seconds and 4 git commands to generate.