#define _GNU_SOURCE
+#include <sys/epoll.h>
#include <sys/shm.h>
+#include <sys/types.h>
+#include <sys/stat.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
int retval;
int result;
- asprintf(&send_msg, "get_subbuffer %s", buf->name);
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ if (asprintf(&send_msg, "get_subbuffer %s", buf->name) < 0) {
+ ERR("get_subbuffer : asprintf failed (%s)",
+ buf->name);
+ retval = -1;
+ goto end;
+ }
+
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) {
DBG("app died while being traced");
retval = GET_SUBBUF_DIED;
goto end_rep;
}
- if(!strcmp(rep_code, "OK")) {
+ if (!strcmp(rep_code, "OK")) {
DBG("got subbuffer %s", buf->name);
retval = GET_SUBBUF_OK;
- }
- else if(nth_token_is(received_msg, "END", 0) == 1) {
- retval = GET_SUBBUF_DONE;
- goto end_rep;
- }
- else if(!strcmp(received_msg, "NOTFOUND")) {
+ } else if(!strcmp(received_msg, "NOTFOUND")) {
DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
retval = GET_SUBBUF_DIED;
goto end_rep;
- }
- else {
+ } else {
DBG("error getting subbuffer %s", buf->name);
retval = -1;
}
int retval;
int result;
- asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ if (asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old) < 0) {
+ ERR("put_subbuffer : asprintf failed (%s %ld)",
+ buf->name, buf->consumed_old);
+ retval = -1;
+ goto end;
+ }
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) {
retval = PUT_SUBBUF_DIED;
goto end;
char *received_msg;
int result;
struct shmid_ds shmds;
+ struct ustcomm_header header;
buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info));
if(buf == NULL) {
return NULL;
}
- buf->conn = malloc(sizeof(struct ustcomm_connection));
- if(buf->conn == NULL) {
- ERR("add_buffer: insufficient memory");
- free(buf);
- return NULL;
- }
-
buf->name = bufname;
buf->pid = pid;
+ /* FIXME: Fix all the freeing and exit sequence from this functions */
/* connect to app */
- result = ustcomm_connect_app(buf->pid, buf->conn);
+ result = ustcomm_connect_app(buf->pid, &buf->app_sock);
if(result) {
WARN("unable to connect to process, it probably died before we were able to connect");
return NULL;
}
/* get pidunique */
- asprintf(&send_msg, "get_pidunique");
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ if (asprintf(&send_msg, "get_pidunique") < 0) {
+ ERR("connect_buffer : asprintf failed (get_pidunique)");
+ return NULL;
+ }
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
free(send_msg);
if(result == -1) {
ERR("problem in ustcomm_send_request(get_pidunique)");
DBG("got pidunique %lld", buf->pidunique);
/* get shmid */
- asprintf(&send_msg, "get_shmid %s", buf->name);
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ if (asprintf(&send_msg, "get_shmid %s", buf->name) < 0) {
+ ERR("connect_buffer : asprintf failed (get_schmid %s)",
+ buf->name);
+ return NULL;
+ }
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
free(send_msg);
if(result == -1) {
ERR("problem in ustcomm_send_request(get_shmid)");
DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
/* get n_subbufs */
- asprintf(&send_msg, "get_n_subbufs %s", buf->name);
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ if (asprintf(&send_msg, "get_n_subbufs %s", buf->name) < 0) {
+ ERR("connect_buffer : asprintf failed (get_n_subbufs %s)",
+ buf->name);
+ return NULL;
+ }
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
free(send_msg);
if(result == -1) {
ERR("problem in ustcomm_send_request(g_n_subbufs)");
DBG("got n_subbufs %d", buf->n_subbufs);
/* get subbuf size */
- asprintf(&send_msg, "get_subbuf_size %s", buf->name);
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ if (asprintf(&send_msg, "get_subbuf_size %s", buf->name) < 0) {
+ ERR("connect_buffer : asprintf failed (get_subbuf_size %s)",
+ buf->name);
+ return NULL;
+ }
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
free(send_msg);
if(result == -1) {
ERR("problem in ustcomm_send_request(get_subbuf_size)");
}
buf->memlen = shmds.shm_segsz;
+ /* get buffer pipe fd */
+ memset(&header, 0, sizeof(header));
+ if (asprintf(&send_msg, "get_buffer_fd %s", buf->name) < 0) {
+ ERR("connect_buffer : asprintf failed (get_buffer_fd %s)",
+ buf->name);
+ return NULL;
+ }
+ header.size = strlen(send_msg) + 1;
+ result = ustcomm_send(buf->app_sock, &header, send_msg);
+ free(send_msg);
+ if (result <= 0) {
+ ERR("ustcomm_send failed.");
+ return NULL;
+ }
+ result = ustcomm_recv_fd(buf->app_sock, &header, NULL, &buf->pipe_fd);
+ if (result <= 0) {
+ ERR("ustcomm_recv_fd failed");
+ return NULL;
+ } else {
+ struct stat temp;
+ fstat(buf->pipe_fd, &temp);
+ if (!S_ISFIFO(temp.st_mode)) {
+ ERR("Didn't receive a fifo from the app");
+ return NULL;
+ }
+ }
if(instance->callbacks->on_open_buffer)
instance->callbacks->on_open_buffer(instance->callbacks, buf);
{
int result;
- result = ustcomm_close_app(buf->conn);
+ result = close(buf->app_sock);
if(result == -1) {
WARN("problem calling ustcomm_close_app");
}
if(callbacks->on_close_buffer)
callbacks->on_close_buffer(callbacks, buf);
- free(buf->conn);
free(buf);
}
int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf)
{
- int result;
+ int result, read_result;
+ char read_buf;
pthread_cleanup_push(decrement_active_buffers, instance);
for(;;) {
+ read_result = read(buf->pipe_fd, &read_buf, 1);
/* get the subbuffer */
- result = get_subbuffer(buf);
- if(result == -1) {
- ERR("error getting subbuffer");
- continue;
- }
- else if(result == GET_SUBBUF_DONE) {
- /* this is done */
- break;
- }
- else if(result == GET_SUBBUF_DIED) {
+ if (read_result == 1) {
+ result = get_subbuffer(buf);
+ if(result == -1) {
+ ERR("error getting subbuffer");
+ continue;
+ } else if (result == GET_SUBBUF_DIED) {
+ finish_consuming_dead_subbuffer(instance->callbacks, buf);
+ break;
+ }
+ } else if ((read_result == -1 && (errno == ECONNRESET || errno == EPIPE)) ||
+ result == 0) {
+ DBG("App died while being traced");
finish_consuming_dead_subbuffer(instance->callbacks, buf);
break;
}
return 0;
}
+static void process_client_cmd(char *recvbuf, struct libustd_instance *instance)
+{
+ if(!strncmp(recvbuf, "collect", 7)) {
+ pid_t pid;
+ char *bufname = NULL;
+ int result;
+
+ result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
+ if (result != 2) {
+ ERR("parsing error: %s", recvbuf);
+ goto free_bufname;
+ }
+
+ result = start_consuming_buffer(instance, pid, bufname);
+ if (result < 0) {
+ ERR("error in add_buffer");
+ goto free_bufname;
+ }
+
+ free_bufname:
+ if (bufname) {
+ free(bufname);
+ }
+ } else if(!strncmp(recvbuf, "exit", 4)) {
+ /* Only there to force poll to return */
+ } else {
+ WARN("unknown command: %s", recvbuf);
+ }
+}
+
+#define MAX_EVENTS 10
int libustd_start_instance(struct libustd_instance *instance)
{
- int result;
- int timeout = -1;
+ struct ustcomm_sock *epoll_sock;
+ struct epoll_event events[MAX_EVENTS];
+ struct sockaddr addr;
+ int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout;
if(!instance->is_init) {
ERR("libustd instance not initialized");
return 1;
}
+ epoll_fd = instance->epoll_fd;
+
+ timeout = -1;
/* app loop */
for(;;) {
- char *recvbuf;
-
- /* check for requests on our public socket */
- result = ustcomm_ustd_recv_message(instance->comm, &recvbuf, NULL, timeout);
- if(result == -1 && errno == EINTR) {
+ nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
+ if (nfds == -1 && errno == EINTR) {
/* Caught signal */
+ } else if (nfds == -1) {
+ PERROR("libustd_start_instance: epoll_wait failed");
+ continue;
}
- else if(result == -1) {
- ERR("error in ustcomm_ustd_recv_message");
- goto loop_end;
- }
- else if(result > 0) {
- if(!strncmp(recvbuf, "collect", 7)) {
- pid_t pid;
- char *bufname;
- int result;
-
- result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
- if(result != 2) {
- ERR("parsing error: %s", recvbuf);
- goto free_bufname;
- }
- result = start_consuming_buffer(instance, pid, bufname);
- if(result < 0) {
- ERR("error in add_buffer");
- goto free_bufname;
+ for (i = 0; i < nfds; ++i) {
+ epoll_sock = (struct ustcomm_sock *)events[i].data.ptr;
+ if (epoll_sock == instance->listen_sock) {
+ addr_size = sizeof(struct sockaddr);
+ accept_fd = accept(epoll_sock->fd,
+ &addr,
+ (socklen_t *)&addr_size);
+ if (accept_fd == -1) {
+ PERROR("libustd_start_instance: "
+ "accept failed");
+ continue;
+ }
+ ustcomm_init_sock(accept_fd, epoll_fd,
+ &instance->connections);
+ } else {
+ char *msg = NULL;
+ result = recv_message_conn(epoll_sock->fd, &msg);
+ if (result == 0) {
+ ustcomm_del_sock(epoll_sock, 0);
+ } else if (msg) {
+ process_client_cmd(msg, instance);
+ free(msg);
}
- free_bufname:
- free(bufname);
- }
- else if(!strncmp(recvbuf, "exit", 4)) {
- /* Only there to force poll to return */
}
- else {
- WARN("unknown command: %s", recvbuf);
- }
-
- free(recvbuf);
}
- loop_end:
-
- if(instance->quit_program) {
+ if (instance->quit_program) {
pthread_mutex_lock(&instance->mutex);
if(instance->active_buffers == 0) {
pthread_mutex_unlock(&instance->mutex);
return 0;
}
+/* FIXME: threads and connections !? */
void libustd_delete_instance(struct libustd_instance *instance)
{
- if(instance->is_init)
- ustcomm_fini_ustd(instance->comm);
+ if (instance->is_init) {
+ ustcomm_del_named_sock(instance->listen_sock, 0);
+ close(instance->epoll_fd);
+ }
pthread_mutex_destroy(&instance->mutex);
free(instance->sock_path);
- free(instance->comm);
free(instance);
}
return 0;
}
-struct libustd_instance *libustd_new_instance(
- struct libustd_callbacks *callbacks, char *sock_path)
+struct libustd_instance
+*libustd_new_instance(struct libustd_callbacks *callbacks,
+ char *sock_path)
{
struct libustd_instance *instance =
zmalloc(sizeof(struct libustd_instance));
- if(!instance)
- return NULL;
-
- instance->comm = malloc(sizeof(struct ustcomm_ustd));
- if(!instance->comm) {
- free(instance);
+ if(!instance) {
return NULL;
}
instance->active_buffers = 0;
pthread_mutex_init(&instance->mutex, NULL);
- if(sock_path)
+ if (sock_path) {
instance->sock_path = strdup(sock_path);
- else
+ } else {
instance->sock_path = NULL;
+ }
return instance;
}
+static int init_ustd_socket(struct libustd_instance *instance)
+{
+ char *name;
+
+ if (instance->sock_path) {
+ if (asprintf(&name, "%s", instance->sock_path) < 0) {
+ ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)",
+ instance->sock_path);
+ return -1;
+ }
+ } else {
+ int result;
+
+ /* Only check if socket dir exists if we are using the default directory */
+ result = ensure_dir_exists(SOCK_DIR);
+ if (result == -1) {
+ ERR("Unable to create socket directory %s", SOCK_DIR);
+ return -1;
+ }
+
+ if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) {
+ ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)",
+ SOCK_DIR);
+ return -1;
+ }
+ }
+
+ /* Set up epoll */
+ instance->epoll_fd = epoll_create(MAX_EVENTS);
+ if (instance->epoll_fd == -1) {
+ ERR("epoll_create failed, start instance bailing");
+ goto free_name;
+ }
+
+ /* Create the named socket */
+ instance->listen_sock = ustcomm_init_named_socket(name,
+ instance->epoll_fd);
+ if(!instance->listen_sock) {
+ ERR("error initializing named socket at %s", name);
+ goto close_epoll;
+ }
+
+ INIT_LIST_HEAD(&instance->connections);
+
+ free(name);
+
+ return 0;
+
+close_epoll:
+ close(instance->epoll_fd);
+free_name:
+ free(name);
+
+ return -1;
+}
+
int libustd_init_instance(struct libustd_instance *instance)
{
int result;
- result = ustcomm_init_ustd(instance->comm, instance->sock_path);
+ result = init_ustd_socket(instance);
if(result == -1) {
ERR("failed to initialize socket");
return 1;