From 16b22a0fe150f4923b9902e802bbf28bacce8d0e Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Fri, 3 Aug 2012 11:45:44 -0400 Subject: [PATCH] in progress create session, register consumer, recv fds, splice metadata, all working Signed-off-by: Julien Desfossez --- src/Makefile.am | 2 +- src/lttngtop.c | 316 +++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 288 insertions(+), 30 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 382cfc2..9341389 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -16,4 +16,4 @@ lttngtop_SOURCES = \ cputop.c \ iostreamtop.c -lttngtop_LDADD = -lbabeltrace -lbabeltrace-ctf +lttngtop_LDADD = -lbabeltrace -lbabeltrace-ctf -llttngtop-helper -llttng-ctl -lurcu diff --git a/src/lttngtop.c b/src/lttngtop.c index fa412ae..f15eea5 100644 --- a/src/lttngtop.c +++ b/src/lttngtop.c @@ -37,6 +37,9 @@ #include #include #include +#include +#include +#include #include "lttngtoptypes.h" #include "cputop.h" @@ -56,6 +59,12 @@ unsigned long refresh_display = 1 * NSEC_PER_SEC; unsigned long last_display_update = 0; int quit = 0; +/* LIVE */ +/* list of FDs available for being read with snapshots */ +struct mmap_stream_list mmap_list; +GPtrArray *lttng_consumer_stream_array; +int sessiond_metadata, consumerd_metadata; + enum { OPT_NONE = 0, OPT_HELP, @@ -337,11 +346,6 @@ static int parse_options(int argc, char **argv) poptContext pc; int opt, ret = 0; - if (argc == 1) { - usage(stdout); - return 1; /* exit cleanly */ - } - pc = poptGetContext(NULL, argc, (const char **) argv, long_options, 0); poptReadDefaultConfig(pc, 0); @@ -358,10 +362,7 @@ static int parse_options(int argc, char **argv) } opt_input_path = poptGetArg(pc); - if (!opt_input_path) { - ret = -EINVAL; - goto end; - } + end: if (pc) { poptFreeContext(pc); @@ -633,6 +634,256 @@ int check_requirements(struct bt_context *ctx) return ret; } +ssize_t read_subbuffer(struct lttng_consumer_stream *kconsumerd_fd, + struct lttng_consumer_local_data *ctx) +{ + unsigned long len; + int err; + long ret = 0; + int infd = helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd); + + if (helper_get_lttng_consumer_stream_output(kconsumerd_fd) == LTTNG_EVENT_SPLICE) { + /* Get the next subbuffer */ + printf("get_next : %d\n", infd); + err = helper_kernctl_get_next_subbuf(infd); + if (err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + /* read the whole subbuffer */ + err = helper_kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + printf("len : %ld\n", len); + + /* splice the subbuffer to the tracefile */ + ret = helper_lttng_consumer_on_read_subbuffer_splice(ctx, kconsumerd_fd, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + fprintf(stderr,"Error splicing to tracefile\n"); + } + printf("ret : %ld\n", ret); + printf("put_next : %d\n", infd); + err = helper_kernctl_put_next_subbuf(infd); + if (err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } +// sem_post(&metadata_available); + } + +end: + return 0; +} + +int on_update_fd(int key, uint32_t state) +{ + /* let the lib handle the metadata FD */ + if (key == sessiond_metadata) + return 0; + return 1; +} + +int on_recv_fd(struct lttng_consumer_stream *kconsumerd_fd) +{ + int ret; + struct mmap_stream *new_info; + size_t tmp_mmap_len; + + printf("Receiving %d\n", helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd)); + /* Opening the tracefile in write mode */ + if (helper_get_lttng_consumer_stream_path_name(kconsumerd_fd) != NULL) { + ret = open(helper_get_lttng_consumer_stream_path_name(kconsumerd_fd), + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + perror("open"); + goto end; + } + helper_set_lttng_consumer_stream_out_fd(kconsumerd_fd, ret); + } + + if (helper_get_lttng_consumer_stream_output(kconsumerd_fd) == LTTNG_EVENT_MMAP) { + new_info = malloc(sizeof(struct mmap_stream)); + new_info->fd = helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd); + bt_list_add(&new_info->list, &mmap_list.head); + + /* get the len of the mmap region */ + ret = helper_kernctl_get_mmap_len(helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd), + &tmp_mmap_len); + if (ret != 0) { + ret = errno; + perror("helper_kernctl_get_mmap_len"); + goto end; + } + helper_set_lttng_consumer_stream_mmap_len(kconsumerd_fd, tmp_mmap_len); + printf("mmap len : %ld\n", tmp_mmap_len); + + helper_set_lttng_consumer_stream_mmap_base(kconsumerd_fd, + mmap(NULL, helper_get_lttng_consumer_stream_mmap_len(kconsumerd_fd), + PROT_READ, MAP_PRIVATE, helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd), 0)); + if (helper_get_lttng_consumer_stream_mmap_base(kconsumerd_fd) == MAP_FAILED) { + perror("Error mmaping"); + ret = -1; + goto end; + } + + g_ptr_array_add(lttng_consumer_stream_array, kconsumerd_fd); + ret = 1; + } else { + consumerd_metadata = helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd); + sessiond_metadata = helper_get_lttng_consumer_stream_key(kconsumerd_fd); + ret = 0; + } + +end: + return ret; +} + + +int setup_consumer(char *command_sock_path, pthread_t *threads, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + + ctx = helper_lttng_consumer_create(HELPER_LTTNG_CONSUMER_KERNEL, + read_subbuffer, NULL, on_recv_fd, on_update_fd); + if (!ctx) + goto end; + + unlink(command_sock_path); + helper_lttng_consumer_set_command_sock_path(ctx, command_sock_path); + helper_lttng_consumer_init(); + + /* Create the thread to manage the receive of fd */ + ret = pthread_create(&threads[0], NULL, helper_lttng_consumer_thread_receive_fds, + (void *) ctx); + if (ret != 0) { + perror("pthread_create"); + goto end; + } + /* Create thread to manage the polling/writing of traces */ + ret = pthread_create(&threads[1], NULL, helper_lttng_consumer_thread_poll_fds, + (void *) ctx); + if (ret != 0) { + perror("pthread_create"); + goto end; + } + +end: + return ret; +} + +int setup_live_tracing() +{ + struct lttng_domain dom; + struct lttng_channel chan; + char *channel_name = "mmapchan"; + struct lttng_consumer_local_data *ctx = NULL; + struct lttng_event ev; + int ret = 0; + char *command_sock_path = "/tmp/consumerd_sock"; + static pthread_t threads[2]; /* recv_fd, poll */ + struct lttng_event_context kctxpid, kctxcomm, kctxppid, kctxtid; + /* list of snapshots currently not consumed */ + GPtrArray *available_snapshots; + + struct lttng_handle *handle; + + BT_INIT_LIST_HEAD(&mmap_list.head); + + lttng_consumer_stream_array = g_ptr_array_new(); + + if ((ret = setup_consumer(command_sock_path, threads, ctx)) < 0) { + fprintf(stderr,"error setting up consumer\n"); + goto end; + } + + available_snapshots = g_ptr_array_new(); + + /* setup the session */ + dom.type = LTTNG_DOMAIN_KERNEL; + + ret = system("rm -rf /tmp/livesession"); + + if ((ret = lttng_create_session("test", "/tmp/livesession")) < 0) { + fprintf(stderr,"error creating the session : %s\n", + helper_lttcomm_get_readable_code(ret)); + goto end; + } + + if ((handle = lttng_create_handle("test", &dom)) == NULL) { + fprintf(stderr,"error creating handle\n"); + goto end; + } + + if ((ret = lttng_register_consumer(handle, command_sock_path)) < 0) { + fprintf(stderr,"error registering consumer : %s\n", + helper_lttcomm_get_readable_code(ret)); + goto end; + } + + strcpy(chan.name, channel_name); + chan.attr.overwrite = 1; +// chan.attr.subbuf_size = 32768; + chan.attr.subbuf_size = 1048576; /* 1MB */ + chan.attr.num_subbuf = 4; + chan.attr.switch_timer_interval = 0; + chan.attr.read_timer_interval = 200; + chan.attr.output = LTTNG_EVENT_MMAP; + + if ((ret = lttng_enable_channel(handle, &chan)) < 0) { + fprintf(stderr,"error creating channel : %s\n", helper_lttcomm_get_readable_code(ret)); + goto end; + } + + sprintf(ev.name, "sched_switch"); + ev.type = LTTNG_EVENT_TRACEPOINT; + + //if ((ret = lttng_enable_event(handle, NULL, channel_name)) < 0) { + if ((ret = lttng_enable_event(handle, &ev, channel_name)) < 0) { + fprintf(stderr,"error enabling event : %s\n", helper_lttcomm_get_readable_code(ret)); + goto end; + } + + kctxpid.ctx = LTTNG_EVENT_CONTEXT_PID; + lttng_add_context(handle, &kctxpid, NULL, NULL); + kctxppid.ctx = LTTNG_EVENT_CONTEXT_PPID; + lttng_add_context(handle, &kctxppid, NULL, NULL); + kctxcomm.ctx = LTTNG_EVENT_CONTEXT_PROCNAME; + lttng_add_context(handle, &kctxcomm, NULL, NULL); + kctxtid.ctx = LTTNG_EVENT_CONTEXT_TID; + lttng_add_context(handle, &kctxtid, NULL, NULL); + + if ((ret = lttng_start_tracing("test")) < 0) { + fprintf(stderr,"error starting tracing : %s\n", helper_lttcomm_get_readable_code(ret)); + goto end; + } + + helper_kernctl_buffer_flush(consumerd_metadata); + sleep(10); + + lttng_stop_tracing("test"); + lttng_destroy_session("test"); + + /* block until metadata is ready */ + //sem_init(&metadata_available, 0, 0); + + //init_lttngtop(); + +end: + return ret; +} + int main(int argc, char **argv) { int ret; @@ -647,31 +898,38 @@ int main(int argc, char **argv) exit(EXIT_SUCCESS); } - init_lttngtop(); - - bt_ctx = bt_context_create(); - ret = bt_context_add_traces_recursive(bt_ctx, opt_input_path, "ctf", NULL); - if (ret < 0) { - fprintf(stderr, "[error] Opening the trace\n"); - goto end; - } - - ret = check_requirements(bt_ctx); - if (ret < 0) { - fprintf(stderr, "[error] some mandatory contexts were missing, exiting.\n"); + if (!opt_input_path) { + printf("live tracing enabled\n"); + setup_live_tracing(); goto end; - } + } else { + init_lttngtop(); + + bt_ctx = bt_context_create(); + ret = bt_context_add_traces_recursive(bt_ctx, opt_input_path, "ctf", NULL); + if (ret < 0) { + fprintf(stderr, "[error] Opening the trace\n"); + goto end; + } - pthread_create(&display_thread, NULL, ncurses_display, (void *) NULL); - pthread_create(&timer_thread, NULL, refresh_thread, (void *) NULL); + ret = check_requirements(bt_ctx); + if (ret < 0) { + fprintf(stderr, "[error] some mandatory contexts were missing, exiting.\n"); + goto end; + } + pthread_create(&display_thread, NULL, ncurses_display, (void *) NULL); + pthread_create(&timer_thread, NULL, refresh_thread, (void *) NULL); - iter_trace(bt_ctx); + iter_trace(bt_ctx); - quit = 1; - pthread_join(display_thread, NULL); - pthread_join(timer_thread, NULL); + quit = 1; + pthread_join(display_thread, NULL); + pthread_join(timer_thread, NULL); + } end: - bt_context_put(bt_ctx); + if (bt_ctx) + bt_context_put(bt_ctx); + return 0; } -- 2.34.1