X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=libustd%2Flibustd.c;h=5cc210806d80203cb5acc8525f19124a101d8a1b;hb=0f973e451177e61b47047f5a803f1d9c80f5a642;hp=3c934536c6785e6d2519b37cc11886dc2351cc8e;hpb=5f9aacae75a4181a54bf16f31ce8e18229dddc60;p=ust.git diff --git a/libustd/libustd.c b/libustd/libustd.c index 3c93453..5cc2108 100644 --- a/libustd/libustd.c +++ b/libustd/libustd.c @@ -18,7 +18,10 @@ #define _GNU_SOURCE +#include #include +#include +#include #include #include #include @@ -29,7 +32,8 @@ #include #include -#include "libustd.h" +#include +#include "lowlevel.h" #include "usterr.h" #include "ustcomm.h" @@ -57,8 +61,14 @@ int get_subbuffer(struct buffer_info *buf) int retval; int result; - asprintf(&send_msg, "get_subbuffer %s", buf->name); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + if (asprintf(&send_msg, "get_subbuffer %s", buf->name) < 0) { + ERR("get_subbuffer : asprintf failed (%s)", + buf->name); + retval = -1; + goto end; + } + + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) { DBG("app died while being traced"); retval = GET_SUBBUF_DIED; @@ -78,20 +88,14 @@ int get_subbuffer(struct buffer_info *buf) goto end_rep; } - if(!strcmp(rep_code, "OK")) { + if (!strcmp(rep_code, "OK")) { DBG("got subbuffer %s", buf->name); retval = GET_SUBBUF_OK; - } - else if(nth_token_is(received_msg, "END", 0) == 1) { - retval = GET_SUBBUF_DONE; - goto end_rep; - } - else if(!strcmp(received_msg, "NOTFOUND")) { + } else if(!strcmp(received_msg, "NOTFOUND")) { DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name); retval = GET_SUBBUF_DIED; goto end_rep; - } - else { + } else { DBG("error getting subbuffer %s", buf->name); retval = -1; } @@ -117,8 +121,13 @@ int put_subbuffer(struct buffer_info *buf) int retval; int result; - asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + if (asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old) < 0) { + ERR("put_subbuffer : asprintf failed (%s %ld)", + buf->name, buf->consumed_old); + retval = -1; + goto end; + } + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) { retval = PUT_SUBBUF_DIED; goto end; @@ -189,8 +198,9 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, char *received_msg; int result; struct shmid_ds shmds; + struct ustcomm_header header; - buf = (struct buffer_info *) malloc(sizeof(struct buffer_info)); + buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info)); if(buf == NULL) { ERR("add_buffer: insufficient memory"); return NULL; @@ -199,16 +209,20 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, buf->name = bufname; buf->pid = pid; + /* FIXME: Fix all the freeing and exit sequence from this functions */ /* connect to app */ - result = ustcomm_connect_app(buf->pid, &buf->conn); + result = ustcomm_connect_app(buf->pid, &buf->app_sock); if(result) { WARN("unable to connect to process, it probably died before we were able to connect"); return NULL; } /* get pidunique */ - asprintf(&send_msg, "get_pidunique"); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + if (asprintf(&send_msg, "get_pidunique") < 0) { + ERR("connect_buffer : asprintf failed (get_pidunique)"); + return NULL; + } + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(get_pidunique)"); @@ -227,8 +241,12 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, DBG("got pidunique %lld", buf->pidunique); /* get shmid */ - asprintf(&send_msg, "get_shmid %s", buf->name); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + if (asprintf(&send_msg, "get_shmid %s", buf->name) < 0) { + ERR("connect_buffer : asprintf failed (get_schmid %s)", + buf->name); + return NULL; + } + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(get_shmid)"); @@ -247,8 +265,12 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid); /* get n_subbufs */ - asprintf(&send_msg, "get_n_subbufs %s", buf->name); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + if (asprintf(&send_msg, "get_n_subbufs %s", buf->name) < 0) { + ERR("connect_buffer : asprintf failed (get_n_subbufs %s)", + buf->name); + return NULL; + } + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(g_n_subbufs)"); @@ -267,8 +289,12 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, DBG("got n_subbufs %d", buf->n_subbufs); /* get subbuf size */ - asprintf(&send_msg, "get_subbuf_size %s", buf->name); - result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); + if (asprintf(&send_msg, "get_subbuf_size %s", buf->name) < 0) { + ERR("connect_buffer : asprintf failed (get_subbuf_size %s)", + buf->name); + return NULL; + } + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(get_subbuf_size)"); @@ -309,6 +335,32 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, } buf->memlen = shmds.shm_segsz; + /* get buffer pipe fd */ + memset(&header, 0, sizeof(header)); + if (asprintf(&send_msg, "get_buffer_fd %s", buf->name) < 0) { + ERR("connect_buffer : asprintf failed (get_buffer_fd %s)", + buf->name); + return NULL; + } + header.size = strlen(send_msg) + 1; + result = ustcomm_send(buf->app_sock, &header, send_msg); + free(send_msg); + if (result <= 0) { + ERR("ustcomm_send failed."); + return NULL; + } + result = ustcomm_recv_fd(buf->app_sock, &header, NULL, &buf->pipe_fd); + if (result <= 0) { + ERR("ustcomm_recv_fd failed"); + return NULL; + } else { + struct stat temp; + fstat(buf->pipe_fd, &temp); + if (!S_ISFIFO(temp.st_mode)) { + ERR("Didn't receive a fifo from the app"); + return NULL; + } + } if(instance->callbacks->on_open_buffer) instance->callbacks->on_open_buffer(instance->callbacks, buf); @@ -328,7 +380,7 @@ static void destroy_buffer(struct libustd_callbacks *callbacks, { int result; - result = ustcomm_close_app(&buf->conn); + result = close(buf->app_sock); if(result == -1) { WARN("problem calling ustcomm_close_app"); } @@ -351,22 +403,26 @@ static void destroy_buffer(struct libustd_callbacks *callbacks, int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf) { - int result; + int result, read_result; + char read_buf; pthread_cleanup_push(decrement_active_buffers, instance); for(;;) { + read_result = read(buf->pipe_fd, &read_buf, 1); /* get the subbuffer */ - result = get_subbuffer(buf); - if(result == -1) { - ERR("error getting subbuffer"); - continue; - } - else if(result == GET_SUBBUF_DONE) { - /* this is done */ - break; - } - else if(result == GET_SUBBUF_DIED) { + if (read_result == 1) { + result = get_subbuffer(buf); + if(result == -1) { + ERR("error getting subbuffer"); + continue; + } else if (result == GET_SUBBUF_DIED) { + finish_consuming_dead_subbuffer(instance->callbacks, buf); + break; + } + } else if ((read_result == -1 && (errno == ECONNRESET || errno == EPIPE)) || + result == 0) { + DBG("App died while being traced"); finish_consuming_dead_subbuffer(instance->callbacks, buf); break; } @@ -486,7 +542,7 @@ int start_consuming_buffer( DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname); - args = (struct consumer_thread_args *) malloc(sizeof(struct consumer_thread_args)); + args = (struct consumer_thread_args *) zmalloc(sizeof(struct consumer_thread_args)); args->pid = pid; args->bufname = strdup(bufname); @@ -507,64 +563,91 @@ int start_consuming_buffer( return 0; } +static void process_client_cmd(char *recvbuf, struct libustd_instance *instance) +{ + if(!strncmp(recvbuf, "collect", 7)) { + pid_t pid; + char *bufname = NULL; + int result; + + result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname); + if (result != 2) { + ERR("parsing error: %s", recvbuf); + goto free_bufname; + } + + result = start_consuming_buffer(instance, pid, bufname); + if (result < 0) { + ERR("error in add_buffer"); + goto free_bufname; + } + + free_bufname: + if (bufname) { + free(bufname); + } + } else if(!strncmp(recvbuf, "exit", 4)) { + /* Only there to force poll to return */ + } else { + WARN("unknown command: %s", recvbuf); + } +} + +#define MAX_EVENTS 10 int libustd_start_instance(struct libustd_instance *instance) { - int result; - int timeout = -1; + struct ustcomm_sock *epoll_sock; + struct epoll_event events[MAX_EVENTS]; + struct sockaddr addr; + int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout; if(!instance->is_init) { ERR("libustd instance not initialized"); return 1; } + epoll_fd = instance->epoll_fd; + + timeout = -1; /* app loop */ for(;;) { - char *recvbuf; - - /* check for requests on our public socket */ - result = ustcomm_ustd_recv_message(&instance->comm, &recvbuf, NULL, timeout); - if(result == -1 && errno == EINTR) { + nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout); + if (nfds == -1 && errno == EINTR) { /* Caught signal */ + } else if (nfds == -1) { + PERROR("libustd_start_instance: epoll_wait failed"); + continue; } - else if(result == -1) { - ERR("error in ustcomm_ustd_recv_message"); - goto loop_end; - } - else if(result > 0) { - if(!strncmp(recvbuf, "collect", 7)) { - pid_t pid; - char *bufname; - int result; - - result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname); - if(result != 2) { - ERR("parsing error: %s", recvbuf); - goto free_bufname; - } - result = start_consuming_buffer(instance, pid, bufname); - if(result < 0) { - ERR("error in add_buffer"); - goto free_bufname; + for (i = 0; i < nfds; ++i) { + epoll_sock = (struct ustcomm_sock *)events[i].data.ptr; + if (epoll_sock == instance->listen_sock) { + addr_size = sizeof(struct sockaddr); + accept_fd = accept(epoll_sock->fd, + &addr, + (socklen_t *)&addr_size); + if (accept_fd == -1) { + PERROR("libustd_start_instance: " + "accept failed"); + continue; + } + ustcomm_init_sock(accept_fd, epoll_fd, + &instance->connections); + } else { + char *msg = NULL; + result = recv_message_conn(epoll_sock->fd, &msg); + if (result == 0) { + ustcomm_del_sock(epoll_sock, 0); + } else if (msg) { + process_client_cmd(msg, instance); + free(msg); } - free_bufname: - free(bufname); - } - else if(!strncmp(recvbuf, "exit", 4)) { - /* Only there to force poll to return */ } - else { - WARN("unknown command: %s", recvbuf); - } - - free(recvbuf); } - loop_end: - - if(instance->quit_program) { + if (instance->quit_program) { pthread_mutex_lock(&instance->mutex); if(instance->active_buffers == 0) { pthread_mutex_unlock(&instance->mutex); @@ -583,10 +666,13 @@ int libustd_start_instance(struct libustd_instance *instance) return 0; } +/* FIXME: threads and connections !? */ void libustd_delete_instance(struct libustd_instance *instance) { - if(instance->is_init) - ustcomm_fini_ustd(&instance->comm); + if (instance->is_init) { + ustcomm_del_named_sock(instance->listen_sock, 0); + close(instance->epoll_fd); + } pthread_mutex_destroy(&instance->mutex); free(instance->sock_path); @@ -634,13 +720,15 @@ int libustd_stop_instance(struct libustd_instance *instance, int send_msg) return 0; } -struct libustd_instance *libustd_new_instance( - struct libustd_callbacks *callbacks, char *sock_path) +struct libustd_instance +*libustd_new_instance(struct libustd_callbacks *callbacks, + char *sock_path) { struct libustd_instance *instance = - malloc(sizeof(struct libustd_instance)); - if(!instance) + zmalloc(sizeof(struct libustd_instance)); + if(!instance) { return NULL; + } instance->callbacks = callbacks; instance->quit_program = 0; @@ -648,18 +736,75 @@ struct libustd_instance *libustd_new_instance( instance->active_buffers = 0; pthread_mutex_init(&instance->mutex, NULL); - if(sock_path) + if (sock_path) { instance->sock_path = strdup(sock_path); - else + } else { instance->sock_path = NULL; + } return instance; } +static int init_ustd_socket(struct libustd_instance *instance) +{ + char *name; + + if (instance->sock_path) { + if (asprintf(&name, "%s", instance->sock_path) < 0) { + ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)", + instance->sock_path); + return -1; + } + } else { + int result; + + /* Only check if socket dir exists if we are using the default directory */ + result = ensure_dir_exists(SOCK_DIR); + if (result == -1) { + ERR("Unable to create socket directory %s", SOCK_DIR); + return -1; + } + + if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) { + ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)", + SOCK_DIR); + return -1; + } + } + + /* Set up epoll */ + instance->epoll_fd = epoll_create(MAX_EVENTS); + if (instance->epoll_fd == -1) { + ERR("epoll_create failed, start instance bailing"); + goto free_name; + } + + /* Create the named socket */ + instance->listen_sock = ustcomm_init_named_socket(name, + instance->epoll_fd); + if(!instance->listen_sock) { + ERR("error initializing named socket at %s", name); + goto close_epoll; + } + + INIT_LIST_HEAD(&instance->connections); + + free(name); + + return 0; + +close_epoll: + close(instance->epoll_fd); +free_name: + free(name); + + return -1; +} + int libustd_init_instance(struct libustd_instance *instance) { int result; - result = ustcomm_init_ustd(&instance->comm, instance->sock_path); + result = init_ustd_socket(instance); if(result == -1) { ERR("failed to initialize socket"); return 1;