Re-write ustcomm parts of UST v2
[ust.git] / libustd / libustd.c
index 999e4dadfcf65da35e4f669e7e0ca71fb3e837ef..5cc210806d80203cb5acc8525f19124a101d8a1b 100644 (file)
 
 #define _GNU_SOURCE
 
+#include <sys/epoll.h>
 #include <sys/shm.h>
+#include <sys/types.h>
+#include <sys/stat.h>
 #include <unistd.h>
 #include <pthread.h>
 #include <signal.h>
@@ -64,7 +67,8 @@ int get_subbuffer(struct buffer_info *buf)
                retval = -1;
                goto end;
        }
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+
+       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
        if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) {
                DBG("app died while being traced");
                retval = GET_SUBBUF_DIED;
@@ -84,20 +88,14 @@ int get_subbuffer(struct buffer_info *buf)
                goto end_rep;
        }
 
-       if(!strcmp(rep_code, "OK")) {
+       if (!strcmp(rep_code, "OK")) {
                DBG("got subbuffer %s", buf->name);
                retval = GET_SUBBUF_OK;
-       }
-       else if(nth_token_is(received_msg, "END", 0) == 1) {
-               retval = GET_SUBBUF_DONE;
-               goto end_rep;
-       }
-       else if(!strcmp(received_msg, "NOTFOUND")) {
+       } else if(!strcmp(received_msg, "NOTFOUND")) {
                DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
                retval = GET_SUBBUF_DIED;
                goto end_rep;
-       }
-       else {
+       } else {
                DBG("error getting subbuffer %s", buf->name);
                retval = -1;
        }
@@ -129,7 +127,7 @@ int put_subbuffer(struct buffer_info *buf)
                retval = -1;
                goto end;
        }
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
        if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) {
                retval = PUT_SUBBUF_DIED;
                goto end;
@@ -200,6 +198,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
        char *received_msg;
        int result;
        struct shmid_ds shmds;
+       struct ustcomm_header header;
 
        buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info));
        if(buf == NULL) {
@@ -207,18 +206,12 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
                return NULL;
        }
 
-       buf->conn = malloc(sizeof(struct ustcomm_connection));
-       if(buf->conn == NULL) {
-               ERR("add_buffer: insufficient memory");
-               free(buf);
-               return NULL;
-       }
-
        buf->name = bufname;
        buf->pid = pid;
 
+       /* FIXME: Fix all the freeing and exit sequence from this functions */
        /* connect to app */
-       result = ustcomm_connect_app(buf->pid, buf->conn);
+       result = ustcomm_connect_app(buf->pid, &buf->app_sock);
        if(result) {
                WARN("unable to connect to process, it probably died before we were able to connect");
                return NULL;
@@ -229,7 +222,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
                ERR("connect_buffer : asprintf failed (get_pidunique)");
                return NULL;
        }
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
        free(send_msg);
        if(result == -1) {
                ERR("problem in ustcomm_send_request(get_pidunique)");
@@ -253,7 +246,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
                    buf->name);
                return NULL;
        }
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
        free(send_msg);
        if(result == -1) {
                ERR("problem in ustcomm_send_request(get_shmid)");
@@ -277,7 +270,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
                    buf->name);
                return NULL;
        }
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
        free(send_msg);
        if(result == -1) {
                ERR("problem in ustcomm_send_request(g_n_subbufs)");
@@ -301,7 +294,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
                    buf->name);
                return NULL;
        }
-       result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+       result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
        free(send_msg);
        if(result == -1) {
                ERR("problem in ustcomm_send_request(get_subbuf_size)");
@@ -342,6 +335,32 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
        }
        buf->memlen = shmds.shm_segsz;
 
+       /* get buffer pipe fd */
+       memset(&header, 0, sizeof(header));
+       if (asprintf(&send_msg, "get_buffer_fd %s", buf->name) < 0) {
+               ERR("connect_buffer : asprintf failed (get_buffer_fd %s)",
+                   buf->name);
+               return NULL;
+       }
+       header.size = strlen(send_msg) + 1;
+       result = ustcomm_send(buf->app_sock, &header, send_msg);
+       free(send_msg);
+       if (result <= 0) {
+               ERR("ustcomm_send failed.");
+               return NULL;
+       }
+       result = ustcomm_recv_fd(buf->app_sock, &header, NULL, &buf->pipe_fd);
+       if (result <= 0) {
+               ERR("ustcomm_recv_fd failed");
+               return NULL;
+       } else {
+               struct stat temp;
+               fstat(buf->pipe_fd, &temp);
+               if (!S_ISFIFO(temp.st_mode)) {
+                       ERR("Didn't receive a fifo from the app");
+                       return NULL;
+               }
+       }
        if(instance->callbacks->on_open_buffer)
                instance->callbacks->on_open_buffer(instance->callbacks, buf);
 
@@ -361,7 +380,7 @@ static void destroy_buffer(struct libustd_callbacks *callbacks,
 {
        int result;
 
-       result = ustcomm_close_app(buf->conn);
+       result = close(buf->app_sock);
        if(result == -1) {
                WARN("problem calling ustcomm_close_app");
        }
@@ -379,28 +398,31 @@ static void destroy_buffer(struct libustd_callbacks *callbacks,
        if(callbacks->on_close_buffer)
                callbacks->on_close_buffer(callbacks, buf);
 
-       free(buf->conn);
        free(buf);
 }
 
 int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf)
 {
-       int result;
+       int result, read_result;
+       char read_buf;
 
        pthread_cleanup_push(decrement_active_buffers, instance);
 
        for(;;) {
+               read_result = read(buf->pipe_fd, &read_buf, 1);
                /* get the subbuffer */
-               result = get_subbuffer(buf);
-               if(result == -1) {
-                       ERR("error getting subbuffer");
-                       continue;
-               }
-               else if(result == GET_SUBBUF_DONE) {
-                       /* this is done */
-                       break;
-               }
-               else if(result == GET_SUBBUF_DIED) {
+               if (read_result == 1) {
+                       result = get_subbuffer(buf);
+                       if(result == -1) {
+                               ERR("error getting subbuffer");
+                               continue;
+                       } else if (result == GET_SUBBUF_DIED) {
+                               finish_consuming_dead_subbuffer(instance->callbacks, buf);
+                               break;
+                       }
+               } else if ((read_result == -1 && (errno == ECONNRESET || errno == EPIPE)) ||
+                          result == 0) {
+                       DBG("App died while being traced");
                        finish_consuming_dead_subbuffer(instance->callbacks, buf);
                        break;
                }
@@ -541,64 +563,91 @@ int start_consuming_buffer(
 
        return 0;
 }
+static void process_client_cmd(char *recvbuf, struct libustd_instance *instance)
+{
+       if(!strncmp(recvbuf, "collect", 7)) {
+               pid_t pid;
+               char *bufname = NULL;
+               int result;
+
+               result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
+               if (result != 2) {
+                       ERR("parsing error: %s", recvbuf);
+                       goto free_bufname;
+               }
+
+               result = start_consuming_buffer(instance, pid, bufname);
+               if (result < 0) {
+                       ERR("error in add_buffer");
+                       goto free_bufname;
+               }
+
+       free_bufname:
+               if (bufname) {
+                       free(bufname);
+               }
+       } else if(!strncmp(recvbuf, "exit", 4)) {
+               /* Only there to force poll to return */
+       } else {
+               WARN("unknown command: %s", recvbuf);
+       }
+}
+
+#define MAX_EVENTS 10
 
 int libustd_start_instance(struct libustd_instance *instance)
 {
-       int result;
-       int timeout = -1;
+       struct ustcomm_sock *epoll_sock;
+       struct epoll_event events[MAX_EVENTS];
+       struct sockaddr addr;
+       int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout;
 
        if(!instance->is_init) {
                ERR("libustd instance not initialized");
                return 1;
        }
+       epoll_fd = instance->epoll_fd;
+
+       timeout = -1;
 
        /* app loop */
        for(;;) {
-               char *recvbuf;
-
-               /* check for requests on our public socket */
-               result = ustcomm_ustd_recv_message(instance->comm, &recvbuf, NULL, timeout);
-               if(result == -1 && errno == EINTR) {
+               nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
+               if (nfds == -1 && errno == EINTR) {
                        /* Caught signal */
+               } else if (nfds == -1) {
+                       PERROR("libustd_start_instance: epoll_wait failed");
+                       continue;
                }
-               else if(result == -1) {
-                       ERR("error in ustcomm_ustd_recv_message");
-                       goto loop_end;
-               }
-               else if(result > 0) {
-                       if(!strncmp(recvbuf, "collect", 7)) {
-                               pid_t pid;
-                               char *bufname;
-                               int result;
-
-                               result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
-                               if(result != 2) {
-                                       ERR("parsing error: %s", recvbuf);
-                                       goto free_bufname;
-                               }
 
-                               result = start_consuming_buffer(instance, pid, bufname);
-                               if(result < 0) {
-                                       ERR("error in add_buffer");
-                                       goto free_bufname;
+               for (i = 0; i < nfds; ++i) {
+                       epoll_sock = (struct ustcomm_sock *)events[i].data.ptr;
+                       if (epoll_sock == instance->listen_sock) {
+                               addr_size = sizeof(struct sockaddr);
+                               accept_fd = accept(epoll_sock->fd,
+                                                  &addr,
+                                                  (socklen_t *)&addr_size);
+                               if (accept_fd == -1) {
+                                       PERROR("libustd_start_instance: "
+                                              "accept failed");
+                                       continue;
+                               }
+                               ustcomm_init_sock(accept_fd, epoll_fd,
+                                                &instance->connections);
+                       } else {
+                               char *msg = NULL;
+                               result = recv_message_conn(epoll_sock->fd, &msg);
+                               if (result == 0) {
+                                       ustcomm_del_sock(epoll_sock, 0);
+                               } else if (msg) {
+                                       process_client_cmd(msg, instance);
+                                       free(msg);
                                }
 
-                               free_bufname:
-                               free(bufname);
                        }
-                       else if(!strncmp(recvbuf, "exit", 4)) {
-                               /* Only there to force poll to return */
-                       }
-                       else {
-                               WARN("unknown command: %s", recvbuf);
-                       }
-
-                       free(recvbuf);
                }
 
-               loop_end:
-
-               if(instance->quit_program) {
+               if (instance->quit_program) {
                        pthread_mutex_lock(&instance->mutex);
                        if(instance->active_buffers == 0) {
                                pthread_mutex_unlock(&instance->mutex);
@@ -617,14 +666,16 @@ int libustd_start_instance(struct libustd_instance *instance)
        return 0;
 }
 
+/* FIXME: threads and connections !? */
 void libustd_delete_instance(struct libustd_instance *instance)
 {
-       if(instance->is_init)
-               ustcomm_fini_ustd(instance->comm);
+       if (instance->is_init) {
+               ustcomm_del_named_sock(instance->listen_sock, 0);
+               close(instance->epoll_fd);
+       }
 
        pthread_mutex_destroy(&instance->mutex);
        free(instance->sock_path);
-       free(instance->comm);
        free(instance);
 }
 
@@ -669,17 +720,13 @@ int libustd_stop_instance(struct libustd_instance *instance, int send_msg)
        return 0;
 }
 
-struct libustd_instance *libustd_new_instance(
-       struct libustd_callbacks *callbacks, char *sock_path)
+struct libustd_instance
+*libustd_new_instance(struct libustd_callbacks *callbacks,
+                     char *sock_path)
 {
        struct libustd_instance *instance =
                zmalloc(sizeof(struct libustd_instance));
-       if(!instance)
-               return NULL;
-
-       instance->comm = malloc(sizeof(struct ustcomm_ustd));
-       if(!instance->comm) {
-               free(instance);
+       if(!instance) {
                return NULL;
        }
 
@@ -689,18 +736,75 @@ struct libustd_instance *libustd_new_instance(
        instance->active_buffers = 0;
        pthread_mutex_init(&instance->mutex, NULL);
 
-       if(sock_path)
+       if (sock_path) {
                instance->sock_path = strdup(sock_path);
-       else
+       } else {
                instance->sock_path = NULL;
+       }
 
        return instance;
 }
 
+static int init_ustd_socket(struct libustd_instance *instance)
+{
+       char *name;
+
+       if (instance->sock_path) {
+               if (asprintf(&name, "%s", instance->sock_path) < 0) {
+                       ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)",
+                           instance->sock_path);
+                       return -1;
+               }
+       } else {
+               int result;
+
+               /* Only check if socket dir exists if we are using the default directory */
+               result = ensure_dir_exists(SOCK_DIR);
+               if (result == -1) {
+                       ERR("Unable to create socket directory %s", SOCK_DIR);
+                       return -1;
+               }
+
+               if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) {
+                       ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)",
+                           SOCK_DIR);
+                       return -1;
+               }
+       }
+
+       /* Set up epoll */
+       instance->epoll_fd = epoll_create(MAX_EVENTS);
+       if (instance->epoll_fd == -1) {
+               ERR("epoll_create failed, start instance bailing");
+               goto free_name;
+       }
+
+       /* Create the named socket */
+       instance->listen_sock = ustcomm_init_named_socket(name,
+                                                         instance->epoll_fd);
+       if(!instance->listen_sock) {
+               ERR("error initializing named socket at %s", name);
+               goto close_epoll;
+       }
+
+       INIT_LIST_HEAD(&instance->connections);
+
+       free(name);
+
+       return 0;
+
+close_epoll:
+       close(instance->epoll_fd);
+free_name:
+       free(name);
+
+       return -1;
+}
+
 int libustd_init_instance(struct libustd_instance *instance)
 {
        int result;
-       result = ustcomm_init_ustd(instance->comm, instance->sock_path);
+       result = init_ustd_socket(instance);
        if(result == -1) {
                ERR("failed to initialize socket");
                return 1;
This page took 0.02964 seconds and 4 git commands to generate.