X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=libustd%2Flibustd.c;h=5cc210806d80203cb5acc8525f19124a101d8a1b;hb=4723ca096d740ff93da400df304c9902e9834e5f;hp=999e4dadfcf65da35e4f669e7e0ca71fb3e837ef;hpb=d9ac3d712ae55d3049000ead812450e2fe067387;p=ust.git diff --git a/libustd/libustd.c b/libustd/libustd.c index 999e4da..5cc2108 100644 --- a/libustd/libustd.c +++ b/libustd/libustd.c @@ -18,7 +18,10 @@ #define _GNU_SOURCE +#include #include +#include +#include #include #include #include @@ -64,7 +67,8 @@ int get_subbuffer(struct buffer_info *buf) retval = -1; goto end; } - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); + + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) { DBG("app died while being traced"); retval = GET_SUBBUF_DIED; @@ -84,20 +88,14 @@ int get_subbuffer(struct buffer_info *buf) goto end_rep; } - if(!strcmp(rep_code, "OK")) { + if (!strcmp(rep_code, "OK")) { DBG("got subbuffer %s", buf->name); retval = GET_SUBBUF_OK; - } - else if(nth_token_is(received_msg, "END", 0) == 1) { - retval = GET_SUBBUF_DONE; - goto end_rep; - } - else if(!strcmp(received_msg, "NOTFOUND")) { + } else if(!strcmp(received_msg, "NOTFOUND")) { DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name); retval = GET_SUBBUF_DIED; goto end_rep; - } - else { + } else { DBG("error getting subbuffer %s", buf->name); retval = -1; } @@ -129,7 +127,7 @@ int put_subbuffer(struct buffer_info *buf) retval = -1; goto end; } - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) { retval = PUT_SUBBUF_DIED; goto end; @@ -200,6 +198,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, char *received_msg; int result; struct shmid_ds shmds; + struct ustcomm_header header; buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info)); if(buf == NULL) { @@ -207,18 +206,12 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, return NULL; } - buf->conn = malloc(sizeof(struct ustcomm_connection)); - if(buf->conn == NULL) { - ERR("add_buffer: insufficient memory"); - free(buf); - return NULL; - } - buf->name = bufname; buf->pid = pid; + /* FIXME: Fix all the freeing and exit sequence from this functions */ /* connect to app */ - result = ustcomm_connect_app(buf->pid, buf->conn); + result = ustcomm_connect_app(buf->pid, &buf->app_sock); if(result) { WARN("unable to connect to process, it probably died before we were able to connect"); return NULL; @@ -229,7 +222,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, ERR("connect_buffer : asprintf failed (get_pidunique)"); return NULL; } - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(get_pidunique)"); @@ -253,7 +246,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, buf->name); return NULL; } - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(get_shmid)"); @@ -277,7 +270,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, buf->name); return NULL; } - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(g_n_subbufs)"); @@ -301,7 +294,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, buf->name); return NULL; } - result = ustcomm_send_request(buf->conn, send_msg, &received_msg); + result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg); free(send_msg); if(result == -1) { ERR("problem in ustcomm_send_request(get_subbuf_size)"); @@ -342,6 +335,32 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, } buf->memlen = shmds.shm_segsz; + /* get buffer pipe fd */ + memset(&header, 0, sizeof(header)); + if (asprintf(&send_msg, "get_buffer_fd %s", buf->name) < 0) { + ERR("connect_buffer : asprintf failed (get_buffer_fd %s)", + buf->name); + return NULL; + } + header.size = strlen(send_msg) + 1; + result = ustcomm_send(buf->app_sock, &header, send_msg); + free(send_msg); + if (result <= 0) { + ERR("ustcomm_send failed."); + return NULL; + } + result = ustcomm_recv_fd(buf->app_sock, &header, NULL, &buf->pipe_fd); + if (result <= 0) { + ERR("ustcomm_recv_fd failed"); + return NULL; + } else { + struct stat temp; + fstat(buf->pipe_fd, &temp); + if (!S_ISFIFO(temp.st_mode)) { + ERR("Didn't receive a fifo from the app"); + return NULL; + } + } if(instance->callbacks->on_open_buffer) instance->callbacks->on_open_buffer(instance->callbacks, buf); @@ -361,7 +380,7 @@ static void destroy_buffer(struct libustd_callbacks *callbacks, { int result; - result = ustcomm_close_app(buf->conn); + result = close(buf->app_sock); if(result == -1) { WARN("problem calling ustcomm_close_app"); } @@ -379,28 +398,31 @@ static void destroy_buffer(struct libustd_callbacks *callbacks, if(callbacks->on_close_buffer) callbacks->on_close_buffer(callbacks, buf); - free(buf->conn); free(buf); } int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf) { - int result; + int result, read_result; + char read_buf; pthread_cleanup_push(decrement_active_buffers, instance); for(;;) { + read_result = read(buf->pipe_fd, &read_buf, 1); /* get the subbuffer */ - result = get_subbuffer(buf); - if(result == -1) { - ERR("error getting subbuffer"); - continue; - } - else if(result == GET_SUBBUF_DONE) { - /* this is done */ - break; - } - else if(result == GET_SUBBUF_DIED) { + if (read_result == 1) { + result = get_subbuffer(buf); + if(result == -1) { + ERR("error getting subbuffer"); + continue; + } else if (result == GET_SUBBUF_DIED) { + finish_consuming_dead_subbuffer(instance->callbacks, buf); + break; + } + } else if ((read_result == -1 && (errno == ECONNRESET || errno == EPIPE)) || + result == 0) { + DBG("App died while being traced"); finish_consuming_dead_subbuffer(instance->callbacks, buf); break; } @@ -541,64 +563,91 @@ int start_consuming_buffer( return 0; } +static void process_client_cmd(char *recvbuf, struct libustd_instance *instance) +{ + if(!strncmp(recvbuf, "collect", 7)) { + pid_t pid; + char *bufname = NULL; + int result; + + result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname); + if (result != 2) { + ERR("parsing error: %s", recvbuf); + goto free_bufname; + } + + result = start_consuming_buffer(instance, pid, bufname); + if (result < 0) { + ERR("error in add_buffer"); + goto free_bufname; + } + + free_bufname: + if (bufname) { + free(bufname); + } + } else if(!strncmp(recvbuf, "exit", 4)) { + /* Only there to force poll to return */ + } else { + WARN("unknown command: %s", recvbuf); + } +} + +#define MAX_EVENTS 10 int libustd_start_instance(struct libustd_instance *instance) { - int result; - int timeout = -1; + struct ustcomm_sock *epoll_sock; + struct epoll_event events[MAX_EVENTS]; + struct sockaddr addr; + int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout; if(!instance->is_init) { ERR("libustd instance not initialized"); return 1; } + epoll_fd = instance->epoll_fd; + + timeout = -1; /* app loop */ for(;;) { - char *recvbuf; - - /* check for requests on our public socket */ - result = ustcomm_ustd_recv_message(instance->comm, &recvbuf, NULL, timeout); - if(result == -1 && errno == EINTR) { + nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout); + if (nfds == -1 && errno == EINTR) { /* Caught signal */ + } else if (nfds == -1) { + PERROR("libustd_start_instance: epoll_wait failed"); + continue; } - else if(result == -1) { - ERR("error in ustcomm_ustd_recv_message"); - goto loop_end; - } - else if(result > 0) { - if(!strncmp(recvbuf, "collect", 7)) { - pid_t pid; - char *bufname; - int result; - - result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname); - if(result != 2) { - ERR("parsing error: %s", recvbuf); - goto free_bufname; - } - result = start_consuming_buffer(instance, pid, bufname); - if(result < 0) { - ERR("error in add_buffer"); - goto free_bufname; + for (i = 0; i < nfds; ++i) { + epoll_sock = (struct ustcomm_sock *)events[i].data.ptr; + if (epoll_sock == instance->listen_sock) { + addr_size = sizeof(struct sockaddr); + accept_fd = accept(epoll_sock->fd, + &addr, + (socklen_t *)&addr_size); + if (accept_fd == -1) { + PERROR("libustd_start_instance: " + "accept failed"); + continue; + } + ustcomm_init_sock(accept_fd, epoll_fd, + &instance->connections); + } else { + char *msg = NULL; + result = recv_message_conn(epoll_sock->fd, &msg); + if (result == 0) { + ustcomm_del_sock(epoll_sock, 0); + } else if (msg) { + process_client_cmd(msg, instance); + free(msg); } - free_bufname: - free(bufname); } - else if(!strncmp(recvbuf, "exit", 4)) { - /* Only there to force poll to return */ - } - else { - WARN("unknown command: %s", recvbuf); - } - - free(recvbuf); } - loop_end: - - if(instance->quit_program) { + if (instance->quit_program) { pthread_mutex_lock(&instance->mutex); if(instance->active_buffers == 0) { pthread_mutex_unlock(&instance->mutex); @@ -617,14 +666,16 @@ int libustd_start_instance(struct libustd_instance *instance) return 0; } +/* FIXME: threads and connections !? */ void libustd_delete_instance(struct libustd_instance *instance) { - if(instance->is_init) - ustcomm_fini_ustd(instance->comm); + if (instance->is_init) { + ustcomm_del_named_sock(instance->listen_sock, 0); + close(instance->epoll_fd); + } pthread_mutex_destroy(&instance->mutex); free(instance->sock_path); - free(instance->comm); free(instance); } @@ -669,17 +720,13 @@ int libustd_stop_instance(struct libustd_instance *instance, int send_msg) return 0; } -struct libustd_instance *libustd_new_instance( - struct libustd_callbacks *callbacks, char *sock_path) +struct libustd_instance +*libustd_new_instance(struct libustd_callbacks *callbacks, + char *sock_path) { struct libustd_instance *instance = zmalloc(sizeof(struct libustd_instance)); - if(!instance) - return NULL; - - instance->comm = malloc(sizeof(struct ustcomm_ustd)); - if(!instance->comm) { - free(instance); + if(!instance) { return NULL; } @@ -689,18 +736,75 @@ struct libustd_instance *libustd_new_instance( instance->active_buffers = 0; pthread_mutex_init(&instance->mutex, NULL); - if(sock_path) + if (sock_path) { instance->sock_path = strdup(sock_path); - else + } else { instance->sock_path = NULL; + } return instance; } +static int init_ustd_socket(struct libustd_instance *instance) +{ + char *name; + + if (instance->sock_path) { + if (asprintf(&name, "%s", instance->sock_path) < 0) { + ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)", + instance->sock_path); + return -1; + } + } else { + int result; + + /* Only check if socket dir exists if we are using the default directory */ + result = ensure_dir_exists(SOCK_DIR); + if (result == -1) { + ERR("Unable to create socket directory %s", SOCK_DIR); + return -1; + } + + if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) { + ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)", + SOCK_DIR); + return -1; + } + } + + /* Set up epoll */ + instance->epoll_fd = epoll_create(MAX_EVENTS); + if (instance->epoll_fd == -1) { + ERR("epoll_create failed, start instance bailing"); + goto free_name; + } + + /* Create the named socket */ + instance->listen_sock = ustcomm_init_named_socket(name, + instance->epoll_fd); + if(!instance->listen_sock) { + ERR("error initializing named socket at %s", name); + goto close_epoll; + } + + INIT_LIST_HEAD(&instance->connections); + + free(name); + + return 0; + +close_epoll: + close(instance->epoll_fd); +free_name: + free(name); + + return -1; +} + int libustd_init_instance(struct libustd_instance *instance) { int result; - result = ustcomm_init_ustd(instance->comm, instance->sock_path); + result = init_ustd_socket(instance); if(result == -1) { ERR("failed to initialize socket"); return 1;