15a3dd2f62ff1971190de83efb398b44c5ba43c1
[lttngtop.git] / src / lttng-live-comm.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include <babeltrace/ctf/ctf-index.h>
38
39 #include <babeltrace/babeltrace.h>
40 #include <babeltrace/ctf/events.h>
41 #include <babeltrace/ctf/callbacks.h>
42 #include <babeltrace/ctf/iterator.h>
43
44 /* for packet_index */
45 #include <babeltrace/ctf/types.h>
46
47 #include <babeltrace/ctf/metadata.h>
48 #include <babeltrace/ctf-text/types.h>
49 #include <babeltrace/ctf/events-internal.h>
50 /*
51 #include <formats/ctf/events-private.h>
52 replaced with
53 */
54 #include "network-live.h"
55 #include "lttngtop.h"
56
57 #include "lttng-live-comm.h"
58 #include "lttng-viewer-abi.h"
59
60 /*
61 * Memory allocation zeroed
62 */
63 #define zmalloc(x) calloc(1, x)
64
65 #ifndef max_t
66 #define max_t(type, a, b) \
67 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
68 #endif
69
70 static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
71 size_t index, int whence);
72 static void add_traces(gpointer key, gpointer value, gpointer user_data);
73 static int del_traces(gpointer key, gpointer value, gpointer user_data);
74
75 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
76 {
77 struct hostent *host;
78 struct sockaddr_in server_addr;
79 int ret;
80
81 host = gethostbyname(ctx->relay_hostname);
82 if (!host) {
83 ret = -1;
84 goto end;
85 }
86
87 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
88 perror("Socket");
89 ret = -1;
90 goto end;
91 }
92
93 server_addr.sin_family = AF_INET;
94 server_addr.sin_port = htons(ctx->port);
95 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
96 bzero(&(server_addr.sin_zero), 8);
97
98 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
99 sizeof(struct sockaddr)) == -1) {
100 perror("Connect");
101 ret = -1;
102 goto end;
103 }
104
105 ret = 0;
106
107 end:
108 return ret;
109 }
110
111 int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
112 {
113 struct lttng_viewer_cmd cmd;
114 struct lttng_viewer_connect connect;
115 int ret;
116 ssize_t ret_len;
117
118 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
119 cmd.data_size = sizeof(connect);
120 cmd.cmd_version = 0;
121
122 connect.viewer_session_id = -1ULL; /* will be set on recv */
123 connect.major = htobe32(LTTNG_LIVE_MAJOR);
124 connect.minor = htobe32(LTTNG_LIVE_MINOR);
125 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
126
127 do {
128 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
129 } while (ret_len < 0 && errno == EINTR);
130 if (ret_len < 0) {
131 fprintf(stderr, "[error] Error sending cmd\n");
132 ret = ret_len;
133 goto error;
134 }
135 assert(ret_len == sizeof(cmd));
136
137 do {
138 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
139 } while (ret_len < 0 && errno == EINTR);
140 if (ret_len < 0) {
141 fprintf(stderr, "[error] Error sending version\n");
142 ret = ret_len;
143 goto error;
144 }
145 assert(ret_len == sizeof(connect));
146
147 do {
148 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
149 } while (ret_len < 0 && errno == EINTR);
150 if (ret_len < 0) {
151 fprintf(stderr, "[error] Error receiving version\n");
152 ret = ret_len;
153 goto error;
154 }
155 assert(ret_len == sizeof(connect));
156
157 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
158 be64toh(connect.viewer_session_id));
159 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
160 be32toh(connect.minor));
161
162 ret = 0;
163
164 error:
165 return ret;
166 }
167
168 static
169 void free_session_list(GPtrArray *session_list)
170 {
171 int i;
172 struct lttng_live_relay_session *relay_session;
173
174 for (i = 0; i < session_list->len; i++) {
175 relay_session = g_ptr_array_index(session_list, i);
176 free(relay_session->name);
177 free(relay_session->hostname);
178 }
179 g_ptr_array_free(session_list, TRUE);
180 }
181
182 static
183 void print_session_list(GPtrArray *session_list, const char *path)
184 {
185 int i;
186 struct lttng_live_relay_session *relay_session;
187
188 for (i = 0; i < session_list->len; i++) {
189 relay_session = g_ptr_array_index(session_list, i);
190 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
191 "%u stream(s), %u client(s) connected)\n",
192 path, relay_session->hostname,
193 relay_session->name, relay_session->timer,
194 relay_session->streams, relay_session->clients);
195 }
196 }
197
198 static
199 void update_session_list(GPtrArray *session_list, char *hostname,
200 char *session_name, uint32_t streams, uint32_t clients,
201 uint32_t timer)
202 {
203 int i, found = 0;
204 struct lttng_live_relay_session *relay_session;
205
206 for (i = 0; i < session_list->len; i++) {
207 relay_session = g_ptr_array_index(session_list, i);
208 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
209 strncmp(relay_session->name,
210 session_name, NAME_MAX) == 0) {
211 relay_session->streams += streams;
212 if (relay_session->clients < clients)
213 relay_session->clients = clients;
214 found = 1;
215 break;
216 }
217 }
218 if (found)
219 return;
220
221 relay_session = g_new0(struct lttng_live_relay_session, 1);
222 relay_session->hostname = strndup(hostname, NAME_MAX);
223 relay_session->name = strndup(session_name, NAME_MAX);
224 relay_session->clients = clients;
225 relay_session->streams = streams;
226 relay_session->timer = timer;
227 g_ptr_array_add(session_list, relay_session);
228 }
229
230 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
231 {
232 struct lttng_viewer_cmd cmd;
233 struct lttng_viewer_list_sessions list;
234 struct lttng_viewer_session lsession;
235 int i, ret, sessions_count, print_list = 0;
236 ssize_t ret_len;
237 uint64_t session_id;
238 GPtrArray *session_list = NULL;
239
240 if (strlen(ctx->session_name) == 0) {
241 print_list = 1;
242 session_list = g_ptr_array_new();
243 }
244
245 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
246 cmd.data_size = 0;
247 cmd.cmd_version = 0;
248
249 do {
250 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
251 } while (ret_len < 0 && errno == EINTR);
252 if (ret_len < 0) {
253 fprintf(stderr, "[error] Error sending cmd\n");
254 ret = ret_len;
255 goto error;
256 }
257 assert(ret_len == sizeof(cmd));
258
259 do {
260 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
261 } while (ret_len < 0 && errno == EINTR);
262 if (ret_len < 0) {
263 fprintf(stderr, "[error] Error receiving session list\n");
264 ret = ret_len;
265 goto error;
266 }
267 assert(ret_len == sizeof(list));
268
269 sessions_count = be32toh(list.sessions_count);
270 for (i = 0; i < sessions_count; i++) {
271 do {
272 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
273 } while (ret_len < 0 && errno == EINTR);
274 if (ret_len < 0) {
275 fprintf(stderr, "[error] Error receiving session\n");
276 ret = ret_len;
277 goto error;
278 }
279 assert(ret_len == sizeof(lsession));
280 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
281 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
282 session_id = be64toh(lsession.id);
283
284 if (print_list) {
285 update_session_list(session_list,
286 lsession.hostname,
287 lsession.session_name,
288 be32toh(lsession.streams),
289 be32toh(lsession.clients),
290 be32toh(lsession.live_timer));
291 } else {
292 if ((strncmp(lsession.session_name, ctx->session_name,
293 NAME_MAX) == 0) && (strncmp(lsession.hostname,
294 ctx->traced_hostname, NAME_MAX) == 0)) {
295 printf_verbose("Reading from session %" PRIu64 "\n",
296 session_id);
297 g_array_append_val(ctx->session_ids,
298 session_id);
299 }
300 }
301 }
302
303 if (print_list) {
304 print_session_list(session_list, path);
305 free_session_list(session_list);
306 }
307
308 ret = 0;
309
310 error:
311 return ret;
312 }
313
314 int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
315 uint64_t ctf_trace_id)
316 {
317 struct lttng_live_ctf_trace *trace;
318 int ret = 0;
319
320 trace = g_hash_table_lookup(stream->session->ctf_traces,
321 (gpointer) ctf_trace_id);
322 if (!trace) {
323 trace = g_new0(struct lttng_live_ctf_trace, 1);
324 trace->ctf_trace_id = ctf_trace_id;
325 trace->streams = g_ptr_array_new();
326 g_hash_table_insert(stream->session->ctf_traces,
327 (gpointer) ctf_trace_id,
328 trace);
329 }
330 if (stream->metadata_flag)
331 trace->metadata_stream = stream;
332
333 stream->ctf_trace = trace;
334 g_ptr_array_add(trace->streams, stream);
335
336 return ret;
337 }
338
339 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
340 {
341 struct lttng_viewer_cmd cmd;
342 struct lttng_viewer_attach_session_request rq;
343 struct lttng_viewer_attach_session_response rp;
344 struct lttng_viewer_stream stream;
345 int ret, i;
346 ssize_t ret_len;
347
348 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
349 cmd.data_size = sizeof(rq);
350 cmd.cmd_version = 0;
351
352 memset(&rq, 0, sizeof(rq));
353 rq.session_id = htobe64(id);
354 // TODO: add cmd line parameter to select seek beginning
355 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
356 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
357
358 do {
359 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
360 } while (ret_len < 0 && errno == EINTR);
361 if (ret_len < 0) {
362 fprintf(stderr, "[error] Error sending cmd\n");
363 ret = ret_len;
364 goto error;
365 }
366 assert(ret_len == sizeof(cmd));
367
368 do {
369 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
370 } while (ret_len < 0 && errno == EINTR);
371 if (ret_len < 0) {
372 fprintf(stderr, "[error] Error sending attach request\n");
373 ret = ret_len;
374 goto error;
375 }
376 assert(ret_len == sizeof(rq));
377
378 do {
379 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
380 } while (ret_len < 0 && errno == EINTR);
381 if (ret_len < 0) {
382 fprintf(stderr, "[error] Error receiving attach response\n");
383 ret = ret_len;
384 goto error;
385 }
386 assert(ret_len == sizeof(rp));
387
388 switch(be32toh(rp.status)) {
389 case LTTNG_VIEWER_ATTACH_OK:
390 break;
391 case LTTNG_VIEWER_ATTACH_UNK:
392 ret = -LTTNG_VIEWER_ATTACH_UNK;
393 goto end;
394 case LTTNG_VIEWER_ATTACH_ALREADY:
395 fprintf(stderr, "[error] Already a viewer attached\n");
396 ret = -1;
397 goto end;
398 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
399 fprintf(stderr, "[error] Not a live session\n");
400 ret = -1;
401 goto end;
402 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
403 fprintf(stderr, "[error] Wrong seek parameter\n");
404 ret = -1;
405 goto end;
406 default:
407 fprintf(stderr, "[error] Unknown attach return code %u\n",
408 be32toh(rp.status));
409 ret = -1;
410 goto end;
411 }
412 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
413 ret = -1;
414 goto end;
415 }
416
417 ctx->session->stream_count += be32toh(rp.streams_count);
418 /*
419 * When the session is created but not started, we do an active wait
420 * until it starts. It allows the viewer to start processing the trace
421 * as soon as the session starts.
422 */
423 if (ctx->session->stream_count == 0) {
424 ret = 0;
425 goto end;
426 }
427 printf_verbose("Waiting for %u streams:\n",
428 be32toh(rp.streams_count));
429 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
430 ctx->session->stream_count);
431 for (i = 0; i < be32toh(rp.streams_count); i++) {
432 do {
433 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
434 } while (ret_len < 0 && errno == EINTR);
435 if (ret_len < 0) {
436 fprintf(stderr, "[error] Error receiving stream\n");
437 ret = ret_len;
438 goto error;
439 }
440 assert(ret_len == sizeof(stream));
441 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
442 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
443
444 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
445 be64toh(stream.id), stream.path_name,
446 stream.channel_name);
447 ctx->session->streams[i].id = be64toh(stream.id);
448 ctx->session->streams[i].session = ctx->session;
449
450 ctx->session->streams[i].first_read = 1;
451 ctx->session->streams[i].mmap_size = 0;
452
453 if (be32toh(stream.metadata_flag)) {
454 char *path;
455
456 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
457 if (!path) {
458 perror("strdup");
459 ret = -1;
460 goto error;
461 }
462 if (!mkdtemp(path)) {
463 perror("mkdtemp");
464 free(path);
465 ret = -1;
466 goto error;
467 }
468 ctx->session->streams[i].metadata_flag = 1;
469 snprintf(ctx->session->streams[i].path,
470 sizeof(ctx->session->streams[i].path),
471 "%s/%s", path,
472 stream.channel_name);
473 ret = open(ctx->session->streams[i].path,
474 O_WRONLY | O_CREAT | O_TRUNC,
475 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
476 if (ret < 0) {
477 perror("open");
478 free(path);
479 goto error;
480 }
481 ctx->session->streams[i].fd = ret;
482 free(path);
483 }
484 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
485 be64toh(stream.ctf_trace_id));
486 if (ret < 0) {
487 goto error;
488 }
489
490 }
491 ret = 0;
492
493 end:
494 error:
495 return ret;
496 }
497
498 static
499 int ask_new_streams(struct lttng_live_ctx *ctx)
500 {
501 int i, ret = 0;
502 uint64_t id;
503
504 restart:
505 for (i = 0; i < ctx->session_ids->len; i++) {
506 id = g_array_index(ctx->session_ids, uint64_t, i);
507 ret = lttng_live_get_new_streams(ctx, id);
508 printf_verbose("Asking for new streams returns %d\n",
509 ret);
510 if (ret < 0) {
511 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
512 printf_verbose("Session %" PRIu64 " closed\n",
513 id);
514 /*
515 * The streams have already been closed during
516 * the reading, so we only need to get rid of
517 * the trace in our internal table of sessions.
518 */
519 g_array_remove_index(ctx->session_ids, i);
520 /*
521 * We can't continue iterating on the g_array
522 * after a remove, we have to start again.
523 */
524 goto restart;
525 } else {
526 ret = -1;
527 goto end;
528 }
529 }
530 }
531
532 end:
533 return ret;
534 }
535
536 static
537 int get_data_packet(struct lttng_live_ctx *ctx,
538 struct ctf_stream_pos *pos,
539 struct lttng_live_viewer_stream *stream, uint64_t offset,
540 uint64_t len)
541 {
542 struct lttng_viewer_cmd cmd;
543 struct lttng_viewer_get_packet rq;
544 struct lttng_viewer_trace_packet rp;
545 ssize_t ret_len;
546 int ret;
547
548 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
549 cmd.data_size = sizeof(rq);
550 cmd.cmd_version = 0;
551
552 memset(&rq, 0, sizeof(rq));
553 rq.stream_id = htobe64(stream->id);
554 /* Already in big endian. */
555 rq.offset = offset;
556 rq.len = htobe32(len);
557
558 do {
559 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
560 } while (ret_len < 0 && errno == EINTR);
561 if (ret_len < 0) {
562 fprintf(stderr, "[error] Error sending cmd\n");
563 ret = ret_len;
564 goto error;
565 }
566 assert(ret_len == sizeof(cmd));
567
568 do {
569 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
570 } while (ret_len < 0 && errno == EINTR);
571 if (ret_len < 0) {
572 fprintf(stderr, "[error] Error sending get_data_packet request\n");
573 ret = ret_len;
574 goto error;
575 }
576 assert(ret_len == sizeof(rq));
577
578 do {
579 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
580 } while (ret_len < 0 && errno == EINTR);
581 if (ret_len < 0) {
582 fprintf(stderr, "[error] Error receiving data response\n");
583 ret = ret_len;
584 goto error;
585 }
586 if (ret_len != sizeof(rp)) {
587 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
588 ", received %" PRId64 "\n", ret_len,
589 sizeof(rp));
590 ret = -1;
591 goto error;
592 }
593
594 rp.flags = be32toh(rp.flags);
595
596 switch (be32toh(rp.status)) {
597 case LTTNG_VIEWER_GET_PACKET_OK:
598 len = be32toh(rp.len);
599 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
600 "\n", len);
601 break;
602 case LTTNG_VIEWER_GET_PACKET_RETRY:
603 printf_verbose("get_data_packet: retry\n");
604 ret = -1;
605 goto end;
606 case LTTNG_VIEWER_GET_PACKET_ERR:
607 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
608 printf_verbose("get_data_packet: new metadata needed\n");
609 ret = 0;
610 goto end;
611 }
612 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
613 printf_verbose("get_data_packet: new streams needed\n");
614 ret = ask_new_streams(ctx);
615 if (ret < 0)
616 goto error;
617 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
618 ctx->bt_ctx);
619 ret = 0;
620 goto end;
621 }
622 fprintf(stderr, "[error] get_data_packet: error\n");
623 ret = -1;
624 goto end;
625 case LTTNG_VIEWER_GET_PACKET_EOF:
626 ret = -2;
627 goto error;
628 default:
629 printf_verbose("get_data_packet: unknown\n");
630 assert(0);
631 ret = -1;
632 goto end;
633 }
634
635 if (len <= 0) {
636 ret = -1;
637 goto end;
638 }
639
640 if (len > stream->mmap_size) {
641 uint64_t new_size;
642
643 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
644 if (pos->base_mma) {
645 /* unmap old base */
646 ret = munmap_align(pos->base_mma);
647 if (ret) {
648 fprintf(stderr, "[error] Unable to unmap old base: %s.\n",
649 strerror(errno));
650 ret = -1;
651 goto error;
652 }
653 pos->base_mma = NULL;
654 }
655 pos->base_mma = mmap_align(new_size,
656 PROT_READ | PROT_WRITE,
657 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
658 if (pos->base_mma == MAP_FAILED) {
659 fprintf(stderr, "[error] mmap error %s.\n",
660 strerror(errno));
661 pos->base_mma = NULL;
662 ret = -1;
663 goto error;
664 }
665
666 stream->mmap_size = new_size;
667 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
668 stream->mmap_size);
669 }
670
671 do {
672 ret_len = recv(ctx->control_sock,
673 mmap_align_addr(pos->base_mma), len,
674 MSG_WAITALL);
675 } while (ret_len < 0 && errno == EINTR);
676 if (ret_len < 0) {
677 fprintf(stderr, "[error] Error receiving trace packet\n");
678 ret = ret_len;
679 goto error;
680 }
681 assert(ret_len == len);
682
683 end:
684 error:
685 return ret;
686 }
687
688 /*
689 * Return number of metadata bytes written or a negative value on error.
690 */
691 static
692 int get_new_metadata(struct lttng_live_ctx *ctx,
693 struct lttng_live_viewer_stream *viewer_stream,
694 uint64_t *metadata_len)
695 {
696 uint64_t len = 0;
697 int ret;
698 struct lttng_viewer_cmd cmd;
699 struct lttng_viewer_get_metadata rq;
700 struct lttng_viewer_metadata_packet rp;
701 struct lttng_live_viewer_stream *metadata_stream;
702 char *data = NULL;
703 ssize_t ret_len;
704
705 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
706 cmd.data_size = sizeof(rq);
707 cmd.cmd_version = 0;
708
709 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
710 rq.stream_id = htobe64(metadata_stream->id);
711
712 do {
713 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
714 } while (ret_len < 0 && errno == EINTR);
715 if (ret_len < 0) {
716 fprintf(stderr, "[error] Error sending cmd\n");
717 ret = ret_len;
718 goto error;
719 }
720 assert(ret_len == sizeof(cmd));
721
722 do {
723 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
724 } while (ret_len < 0 && errno == EINTR);
725 if (ret_len < 0) {
726 fprintf(stderr, "[error] Error sending get_metadata request\n");
727 ret = ret_len;
728 goto error;
729 }
730 assert(ret_len == sizeof(rq));
731
732 do {
733 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
734 } while (ret_len < 0 && errno == EINTR);
735 if (ret_len < 0) {
736 fprintf(stderr, "[error] Error receiving metadata response\n");
737 ret = ret_len;
738 goto error;
739 }
740 assert(ret_len == sizeof(rp));
741
742 switch (be32toh(rp.status)) {
743 case LTTNG_VIEWER_METADATA_OK:
744 printf_verbose("get_metadata : OK\n");
745 break;
746 case LTTNG_VIEWER_NO_NEW_METADATA:
747 printf_verbose("get_metadata : NO NEW\n");
748 ret = -1;
749 goto end;
750 case LTTNG_VIEWER_METADATA_ERR:
751 printf_verbose("get_metadata : ERR\n");
752 ret = -1;
753 goto end;
754 default:
755 printf_verbose("get_metadata : UNKNOWN\n");
756 ret = -1;
757 goto end;
758 }
759
760 len = be64toh(rp.len);
761 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
762 if (len <= 0) {
763 ret = -1;
764 goto end;
765 }
766
767 data = zmalloc(len);
768 if (!data) {
769 perror("relay data zmalloc");
770 ret = -1;
771 goto error;
772 }
773 do {
774 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
775 } while (ret_len < 0 && errno == EINTR);
776 if (ret_len < 0) {
777 fprintf(stderr, "[error] Error receiving trace packet\n");
778 ret = ret_len;
779 free(data);
780 goto error;
781 }
782 assert(ret_len == len);
783
784 do {
785 ret_len = write(metadata_stream->fd, data, len);
786 } while (ret_len < 0 && errno == EINTR);
787 if (ret_len < 0) {
788 free(data);
789 ret = ret_len;
790 goto error;
791 }
792 assert(ret_len == len);
793
794 free(data);
795
796 *metadata_len = len;
797 ret = 0;
798 end:
799 error:
800 return ret;
801 }
802
803 /*
804 * Get one index for a stream.
805 *
806 * Returns 0 on success or a negative value on error.
807 */
808 static
809 int get_next_index(struct lttng_live_ctx *ctx,
810 struct lttng_live_viewer_stream *viewer_stream,
811 struct packet_index *index)
812 {
813 struct lttng_viewer_cmd cmd;
814 struct lttng_viewer_get_next_index rq;
815 struct lttng_viewer_index rp;
816 int ret;
817 uint64_t metadata_len;
818 ssize_t ret_len;
819
820 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
821 cmd.data_size = sizeof(rq);
822 cmd.cmd_version = 0;
823
824 memset(&rq, 0, sizeof(rq));
825 rq.stream_id = htobe64(viewer_stream->id);
826
827 retry:
828 do {
829 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
830 } while (ret_len < 0 && errno == EINTR);
831 if (ret_len < 0) {
832 fprintf(stderr, "[error] Error sending cmd\n");
833 ret = ret_len;
834 goto error;
835 }
836 assert(ret_len == sizeof(cmd));
837
838 do {
839 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
840 } while (ret_len < 0 && errno == EINTR);
841 if (ret_len < 0) {
842 fprintf(stderr, "[error] Error sending get_next_index request\n");
843 ret = ret_len;
844 goto error;
845 }
846 assert(ret_len == sizeof(rq));
847
848 do {
849 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
850 } while (ret_len < 0 && errno == EINTR);
851 if (ret_len < 0) {
852 fprintf(stderr, "[error] Error receiving index response\n");
853 ret = ret_len;
854 goto error;
855 }
856 assert(ret_len == sizeof(rp));
857
858 rp.flags = be32toh(rp.flags);
859
860 switch (be32toh(rp.status)) {
861 case LTTNG_VIEWER_INDEX_INACTIVE:
862 printf_verbose("get_next_index: inactive\n");
863 memset(index, 0, sizeof(struct packet_index));
864 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
865 break;
866 case LTTNG_VIEWER_INDEX_OK:
867 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
868 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
869 index->offset = be64toh(rp.offset);
870 index->packet_size = be64toh(rp.packet_size);
871 index->content_size = be64toh(rp.content_size);
872 index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
873 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
874 index->events_discarded = be64toh(rp.events_discarded);
875
876 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
877 printf_verbose("get_next_index: new metadata needed\n");
878 ret = get_new_metadata(ctx, viewer_stream,
879 &metadata_len);
880 if (ret < 0) {
881 goto error;
882 }
883 }
884 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
885 ret = ask_new_streams(ctx);
886 if (ret < 0)
887 goto error;
888 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
889 ctx->bt_ctx);
890 }
891 break;
892 case LTTNG_VIEWER_INDEX_RETRY:
893 printf_verbose("get_next_index: retry\n");
894 sleep(0.1);
895 goto retry;
896 case LTTNG_VIEWER_INDEX_HUP:
897 printf_verbose("get_next_index: stream hung up\n");
898 viewer_stream->id = -1ULL;
899 viewer_stream->fd = -1;
900 index->offset = EOF;
901 ctx->session->stream_count--;
902 break;
903 case LTTNG_VIEWER_INDEX_ERR:
904 fprintf(stderr, "[error] get_next_index: error\n");
905 ret = -1;
906 goto error;
907 default:
908 fprintf(stderr, "[error] get_next_index: unkwown value\n");
909 ret = -1;
910 goto error;
911 }
912
913 ret = 0;
914
915 error:
916 return ret;
917 }
918
919 static
920 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
921 int whence)
922 {
923 struct ctf_stream_pos *pos;
924 struct ctf_file_stream *file_stream;
925 struct packet_index *prev_index = NULL, *cur_index;
926 struct lttng_live_viewer_stream *viewer_stream;
927 struct lttng_live_session *session;
928 int ret;
929
930 retry:
931 pos = ctf_pos(stream_pos);
932 file_stream = container_of(pos, struct ctf_file_stream, pos);
933 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
934 session = viewer_stream->session;
935
936 switch (pos->packet_index->len) {
937 case 0:
938 g_array_set_size(pos->packet_index, 1);
939 cur_index = &g_array_index(pos->packet_index,
940 struct packet_index, 0);
941 break;
942 case 1:
943 g_array_set_size(pos->packet_index, 2);
944 prev_index = &g_array_index(pos->packet_index,
945 struct packet_index, 0);
946 cur_index = &g_array_index(pos->packet_index,
947 struct packet_index, 1);
948 break;
949 case 2:
950 g_array_index(pos->packet_index,
951 struct packet_index, 0) =
952 g_array_index(pos->packet_index,
953 struct packet_index, 1);
954 prev_index = &g_array_index(pos->packet_index,
955 struct packet_index, 0);
956 cur_index = &g_array_index(pos->packet_index,
957 struct packet_index, 1);
958 break;
959 default:
960 abort();
961 break;
962 }
963
964 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
965 ret = get_next_index(session->ctx, viewer_stream, cur_index);
966 if (ret < 0) {
967 pos->offset = EOF;
968 fprintf(stderr, "[error] get_next_index failed\n");
969 return;
970 }
971
972 pos->packet_size = cur_index->packet_size;
973 pos->content_size = cur_index->content_size;
974 pos->mmap_base_offset = 0;
975 if (cur_index->offset == EOF) {
976 pos->offset = EOF;
977 } else {
978 pos->offset = 0;
979 }
980
981 if (cur_index->content_size == 0) {
982 file_stream->parent.cycles_timestamp =
983 cur_index->ts_cycles.timestamp_end;
984 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
985 &file_stream->parent,
986 cur_index->ts_cycles.timestamp_end);
987 } else {
988 /* Convert the timestamps and append to the real_index. */
989 cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
990 &file_stream->parent,
991 cur_index->ts_cycles.timestamp_begin);
992 cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
993 &file_stream->parent,
994 cur_index->ts_cycles.timestamp_end);
995
996 ctf_update_current_packet_index(&file_stream->parent,
997 prev_index, cur_index);
998
999 file_stream->parent.cycles_timestamp =
1000 cur_index->ts_cycles.timestamp_begin;
1001 file_stream->parent.real_timestamp =
1002 cur_index->ts_real.timestamp_begin;
1003 }
1004
1005 if (pos->packet_size == 0 || pos->offset == EOF) {
1006 goto end;
1007 }
1008
1009 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
1010 viewer_stream->id);
1011 ret = get_data_packet(session->ctx, pos, viewer_stream,
1012 be64toh(cur_index->offset),
1013 cur_index->packet_size / CHAR_BIT);
1014 if (ret == -2) {
1015 goto retry;
1016 } else if (ret < 0) {
1017 pos->offset = EOF;
1018 fprintf(stderr, "[error] get_data_packet failed\n");
1019 return;
1020 }
1021
1022 printf_verbose("Index received : packet_size : %" PRIu64
1023 ", offset %" PRIu64 ", content_size %" PRIu64
1024 ", timestamp_end : %" PRIu64 "\n",
1025 cur_index->packet_size, cur_index->offset,
1026 cur_index->content_size,
1027 cur_index->ts_cycles.timestamp_end);
1028
1029 /* update trace_packet_header and stream_packet_context */
1030 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
1031 /* Read packet header */
1032 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
1033 if (ret) {
1034 pos->offset = EOF;
1035 fprintf(stderr, "[error] trace packet header read failed\n");
1036 goto end;
1037 }
1038 }
1039 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
1040 /* Read packet context */
1041 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
1042 if (ret) {
1043 pos->offset = EOF;
1044 fprintf(stderr, "[error] stream packet context read failed\n");
1045 goto end;
1046 }
1047 }
1048 pos->data_offset = pos->offset;
1049
1050 end:
1051 return;
1052 }
1053
1054 int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1055 {
1056 struct lttng_viewer_cmd cmd;
1057 struct lttng_viewer_create_session_response resp;
1058 int ret;
1059 ssize_t ret_len;
1060
1061 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1062 cmd.data_size = 0;
1063 cmd.cmd_version = 0;
1064
1065 do {
1066 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1067 } while (ret_len < 0 && errno == EINTR);
1068 if (ret_len < 0) {
1069 fprintf(stderr, "[error] Error sending cmd\n");
1070 ret = ret_len;
1071 goto error;
1072 }
1073 assert(ret_len == sizeof(cmd));
1074
1075 do {
1076 ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
1077 } while (ret_len < 0 && errno == EINTR);
1078 if (ret_len < 0) {
1079 fprintf(stderr, "[error] Error receiving create session reply\n");
1080 ret = ret_len;
1081 goto error;
1082 }
1083 assert(ret_len == sizeof(resp));
1084
1085 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1086 fprintf(stderr, "[error] Error creating viewer session\n");
1087 ret = -1;
1088 goto error;
1089 }
1090 ret = 0;
1091
1092 error:
1093 return ret;
1094 }
1095
1096 static
1097 int del_traces(gpointer key, gpointer value, gpointer user_data)
1098 {
1099 struct bt_context *bt_ctx = user_data;
1100 struct lttng_live_ctf_trace *trace = value;
1101 int ret;
1102
1103 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1104 if (ret < 0)
1105 fprintf(stderr, "[error] removing trace from context\n");
1106
1107 /* remove the key/value pair from the HT. */
1108 return 1;
1109 }
1110
1111 static
1112 void add_traces(gpointer key, gpointer value, gpointer user_data)
1113 {
1114 int i, ret, total_metadata = 0;
1115 uint64_t metadata_len;
1116 struct bt_context *bt_ctx = user_data;
1117 struct lttng_live_ctf_trace *trace = value;
1118 struct lttng_live_viewer_stream *stream;
1119 struct bt_mmap_stream *new_mmap_stream;
1120 struct bt_mmap_stream_list mmap_list;
1121 struct lttng_live_ctx *ctx = NULL;
1122
1123 /*
1124 * We don't know how many streams we will receive for a trace, so
1125 * once we are done receiving the traces, we add all the traces
1126 * received to the bt_context.
1127 * We can receive streams during the attach command or the
1128 * get_new_streams, so we have to make sure not to add multiple
1129 * times the same traces.
1130 * If a trace is already in the context, we just skip this function.
1131 */
1132 if (trace->in_use)
1133 return;
1134
1135 BT_INIT_LIST_HEAD(&mmap_list.head);
1136
1137 for (i = 0; i < trace->streams->len; i++) {
1138 stream = g_ptr_array_index(trace->streams, i);
1139 ctx = stream->session->ctx;
1140
1141 if (!stream->metadata_flag) {
1142 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1143 new_mmap_stream->priv = (void *) stream;
1144 new_mmap_stream->fd = -1;
1145 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1146 } else {
1147 /* Get all possible metadata before starting */
1148 do {
1149 ret = get_new_metadata(ctx, stream,
1150 &metadata_len);
1151 if (ret == 0) {
1152 total_metadata += metadata_len;
1153 }
1154 } while (ret == 0 || total_metadata == 0);
1155 trace->metadata_fp = fopen(stream->path, "r");
1156 }
1157 }
1158
1159 if (!trace->metadata_fp) {
1160 fprintf(stderr, "[error] No metadata stream opened\n");
1161 goto end_free;
1162 }
1163
1164 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1165 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1166 if (ret < 0) {
1167 fprintf(stderr, "[error] Error adding trace\n");
1168 goto end_free;
1169 }
1170
1171 if (bt_ctx->current_iterator) {
1172 struct bt_trace_descriptor *td;
1173 struct bt_trace_handle *handle;
1174
1175 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1176 bt_ctx->trace_handles,
1177 (gpointer) (unsigned long) ret);
1178 td = handle->td;
1179 bt_iter_add_trace(bt_ctx->current_iterator, td);
1180 }
1181
1182 trace->trace_id = ret;
1183 trace->in_use = 1;
1184
1185 goto end;
1186
1187 end_free:
1188 bt_context_put(bt_ctx);
1189 end:
1190 return;
1191 }
1192
1193 int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
1194 {
1195 struct lttng_viewer_cmd cmd;
1196 struct lttng_viewer_new_streams_request rq;
1197 struct lttng_viewer_new_streams_response rp;
1198 struct lttng_viewer_stream stream;
1199 int ret, i;
1200 ssize_t ret_len;
1201 uint32_t stream_count;
1202
1203 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1204 cmd.data_size = sizeof(rq);
1205 cmd.cmd_version = 0;
1206
1207 memset(&rq, 0, sizeof(rq));
1208 rq.session_id = htobe64(id);
1209
1210 do {
1211 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1212 } while (ret_len < 0 && errno == EINTR);
1213 if (ret_len < 0) {
1214 fprintf(stderr, "[error] Error sending cmd\n");
1215 ret = ret_len;
1216 goto error;
1217 }
1218 assert(ret_len == sizeof(cmd));
1219
1220 do {
1221 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
1222 } while (ret_len < 0 && errno == EINTR);
1223 if (ret_len < 0) {
1224 fprintf(stderr, "[error] Error sending get_new_streams request\n");
1225 ret = ret_len;
1226 goto error;
1227 }
1228 assert(ret_len == sizeof(rq));
1229
1230 do {
1231 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
1232 } while (ret_len < 0 && errno == EINTR);
1233 if (ret_len < 0) {
1234 fprintf(stderr, "[error] Error receiving get_new_streams response\n");
1235 ret = ret_len;
1236 goto error;
1237 }
1238 assert(ret_len == sizeof(rp));
1239
1240 switch(be32toh(rp.status)) {
1241 case LTTNG_VIEWER_NEW_STREAMS_OK:
1242 break;
1243 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1244 ret = 0;
1245 goto end;
1246 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1247 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1248 goto end;
1249 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1250 fprintf(stderr, "[error] get_new_streams error\n");
1251 ret = -1;
1252 goto end;
1253 default:
1254 fprintf(stderr, "[error] Unknown return code %u\n",
1255 be32toh(rp.status));
1256 ret = -1;
1257 goto end;
1258 }
1259
1260 stream_count = be32toh(rp.streams_count);
1261 ctx->session->stream_count += stream_count;
1262 /*
1263 * When the session is created but not started, we do an active wait
1264 * until it starts. It allows the viewer to start processing the trace
1265 * as soon as the session starts.
1266 */
1267 if (ctx->session->stream_count == 0) {
1268 ret = 0;
1269 goto end;
1270 }
1271 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1272 ctx->session->stream_count);
1273 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1274 ctx->session->stream_count);
1275 for (i = 0; i < stream_count; i++) {
1276 do {
1277 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
1278 } while (ret_len < 0 && errno == EINTR);
1279 if (ret_len < 0) {
1280 fprintf(stderr, "[error] Error receiving stream\n");
1281 ret = ret_len;
1282 goto error;
1283 }
1284 assert(ret_len == sizeof(stream));
1285 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1286 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1287
1288 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1289 be64toh(stream.id), stream.path_name,
1290 stream.channel_name);
1291 ctx->session->streams[i].id = be64toh(stream.id);
1292 ctx->session->streams[i].session = ctx->session;
1293
1294 ctx->session->streams[i].first_read = 1;
1295 ctx->session->streams[i].mmap_size = 0;
1296
1297 if (be32toh(stream.metadata_flag)) {
1298 char *path;
1299
1300 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
1301 if (!path) {
1302 perror("strdup");
1303 ret = -1;
1304 goto error;
1305 }
1306 if (!mkdtemp(path)) {
1307 perror("mkdtemp");
1308 free(path);
1309 ret = -1;
1310 goto error;
1311 }
1312 ctx->session->streams[i].metadata_flag = 1;
1313 snprintf(ctx->session->streams[i].path,
1314 sizeof(ctx->session->streams[i].path),
1315 "%s/%s", path,
1316 stream.channel_name);
1317 ret = open(ctx->session->streams[i].path,
1318 O_WRONLY | O_CREAT | O_TRUNC,
1319 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1320 if (ret < 0) {
1321 perror("open");
1322 free(path);
1323 goto error;
1324 }
1325 ctx->session->streams[i].fd = ret;
1326 free(path);
1327 }
1328 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1329 be64toh(stream.ctf_trace_id));
1330 if (ret < 0) {
1331 goto error;
1332 }
1333
1334 }
1335 ret = 0;
1336
1337 end:
1338 error:
1339 return ret;
1340 }
1341
1342 void lttng_live_read(struct lttng_live_ctx *ctx)
1343 {
1344 int ret, i;
1345 #if 0
1346 struct bt_ctf_iter *iter;
1347 const struct bt_ctf_event *event;
1348 struct bt_iter_pos begin_pos;
1349 struct bt_trace_descriptor *td_write;
1350 struct bt_format *fmt_write;
1351 struct ctf_text_stream_pos *sout;
1352 #endif
1353 uint64_t id;
1354
1355 ctx->bt_ctx = bt_context_create();
1356 if (!ctx->bt_ctx) {
1357 fprintf(stderr, "[error] bt_context_create allocation\n");
1358 goto end;
1359 }
1360
1361 #if 0
1362 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1363 if (!fmt_write) {
1364 fprintf(stderr, "[error] ctf-text error\n");
1365 goto end;
1366 }
1367
1368 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1369 if (!td_write) {
1370 fprintf(stderr, "[error] Error opening output trace\n");
1371 goto end_free;
1372 }
1373
1374 sout = container_of(td_write, struct ctf_text_stream_pos,
1375 trace_descriptor);
1376 if (!sout->parent.event_cb)
1377 goto end_free;
1378 #endif
1379
1380 ret = lttng_live_create_viewer_session(ctx);
1381 if (ret < 0) {
1382 goto end_free;
1383 }
1384
1385 for (i = 0; i < ctx->session_ids->len; i++) {
1386 id = g_array_index(ctx->session_ids, uint64_t, i);
1387 printf_verbose("Attaching to session %lu\n", id);
1388 ret = lttng_live_attach_session(ctx, id);
1389 printf_verbose("Attaching session returns %d\n", ret);
1390 if (ret < 0) {
1391 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1392 fprintf(stderr, "[error] Unknown session ID\n");
1393 }
1394 goto end_free;
1395 }
1396 }
1397
1398 /*
1399 * As long as the session is active, we try to get new streams.
1400 */
1401 #if 0
1402 for (;;) {
1403 int flags;
1404 #endif
1405
1406 while (!ctx->session->stream_count) {
1407 if (ctx->session_ids->len == 0)
1408 goto end_free;
1409 ret = ask_new_streams(ctx);
1410 if (ret < 0)
1411 goto end_free;
1412 }
1413
1414 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1415 ctx->bt_ctx);
1416
1417 #if 0
1418 begin_pos.type = BT_SEEK_BEGIN;
1419 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
1420 if (!iter) {
1421 fprintf(stderr, "[error] Iterator creation error\n");
1422 goto end;
1423 }
1424
1425 for (;;) {
1426 event = bt_ctf_iter_read_event_flags(iter, &flags);
1427 if (!(flags & BT_ITER_FLAG_RETRY)) {
1428 if (!event) {
1429 /* End of trace */
1430 break;
1431 }
1432 ret = sout->parent.event_cb(&sout->parent,
1433 event->parent->stream);
1434 if (ret) {
1435 fprintf(stderr, "[error] Writing "
1436 "event failed.\n");
1437 goto end_free;
1438 }
1439 }
1440 ret = bt_iter_next(bt_ctf_get_iter(iter));
1441 if (ret < 0) {
1442 goto end_free;
1443 }
1444 }
1445 bt_ctf_iter_destroy(iter);
1446 #endif
1447 ret = check_requirements(ctx->bt_ctx);
1448 if (ret < 0 && !valid_trace) {
1449 fprintf(stderr, "[error] some mandatory contexts "
1450 "were missing, exiting.\n");
1451 goto end;
1452 }
1453
1454 if (!opt_textdump) {
1455 #ifdef HAVE_LIBNCURSES
1456 pthread_create(&display_thread, NULL, ncurses_display,
1457 (void *) NULL);
1458 pthread_create(&timer_thread, NULL, refresh_thread,
1459 (void *) NULL);
1460 #else
1461 printf("Ncurses support not compiled, please install "
1462 "the missing dependencies and recompile\n");
1463 goto end_free;
1464 #endif
1465 }
1466 iter_trace(ctx->bt_ctx);
1467 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1468 del_traces, ctx->bt_ctx);
1469 ctx->session->stream_count = 0;
1470 #if 0
1471 }
1472 #endif
1473
1474 end_free:
1475 bt_context_put(ctx->bt_ctx);
1476 end:
1477 return;
1478 }
This page took 0.087738 seconds and 3 git commands to generate.