/* Copyright (C) 2009 Pierre-Marc Fournier
+ * 2010 Alexis Halle
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
#include <sys/shm.h>
#include <fcntl.h>
#include <unistd.h>
-#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <assert.h>
#include <getopt.h>
-#include "ustd.h"
+#include "ust/ustd.h"
#include "usterr.h"
-#include "ustcomm.h"
-
-/* return value: 0 = subbuffer is finished, it won't produce data anymore
- * 1 = got subbuffer successfully
- * <0 = error
- */
-
-#define GET_SUBBUF_OK 1
-#define GET_SUBBUF_DONE 0
-#define GET_SUBBUF_DIED 2
-
-#define PUT_SUBBUF_OK 1
-#define PUT_SUBBUF_DIED 0
-#define PUT_SUBBUF_PUSHED 2
char *sock_path=NULL;
char *trace_path=NULL;
int daemon_mode = 0;
char *pidfile = NULL;
-/* Number of active buffers and the mutex to protect it. */
-int active_buffers = 0;
-pthread_mutex_t active_buffers_mutex = PTHREAD_MUTEX_INITIALIZER;
-/* Whether a request to end the program was received. */
-sig_atomic_t terminate_req = 0;
-
-int get_subbuffer(struct buffer_info *buf)
-{
- char *send_msg=NULL;
- char *received_msg=NULL;
- char *rep_code=NULL;
- int retval;
- int result;
-
- asprintf(&send_msg, "get_subbuffer %s", buf->name);
- result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
- if((result == -1 && errno == EPIPE) || result == 0) {
- DBG("app died while being traced");
- retval = GET_SUBBUF_DIED;
- goto end;
- }
- else if(result < 0) {
- ERR("get_subbuffer: ustcomm_send_request failed");
- retval = -1;
- goto end;
- }
-
- result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
- if(result != 2 && result != 1) {
- ERR("unable to parse response to get_subbuffer");
- retval = -1;
- goto end_rep;
- }
+struct libustd_instance *instance;
- DBG("received msg is %s", received_msg);
-
- if(!strcmp(rep_code, "OK")) {
- DBG("got subbuffer %s", buf->name);
- retval = GET_SUBBUF_OK;
- }
- else if(nth_token_is(received_msg, "END", 0) == 1) {
- retval = GET_SUBBUF_DONE;
- goto end_rep;
- }
- else {
- DBG("error getting subbuffer %s", buf->name);
- retval = -1;
- }
-
- /* FIMXE: free correctly the stuff */
-end_rep:
- if(rep_code)
- free(rep_code);
-end:
- if(send_msg)
- free(send_msg);
- if(received_msg)
- free(received_msg);
-
- return retval;
-}
+struct buffer_info_local {
+ /* output file */
+ int file_fd;
+ /* the offset we must truncate to, to unput the last subbuffer */
+ off_t previous_offset;
+};
-int put_subbuffer(struct buffer_info *buf)
+static int write_pidfile(const char *file_name, pid_t pid)
{
- char *send_msg=NULL;
- char *received_msg=NULL;
- char *rep_code=NULL;
- int retval;
- int result;
+ FILE *pidfp;
- asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
- result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
- if(result < 0 && errno == ECONNRESET) {
- retval = PUT_SUBBUF_DIED;
- goto end;
- }
- if(result < 0) {
- ERR("put_subbuffer: send_message failed");
- retval = -1;
- goto end;
- }
-
- result = sscanf(received_msg, "%as", &rep_code);
- if(result != 1) {
- ERR("unable to parse response to put_subbuffer");
- retval = -1;
- goto end_rep;
- }
-
- if(!strcmp(rep_code, "OK")) {
- DBG("subbuffer put %s", buf->name);
- retval = PUT_SUBBUF_OK;
- }
- else {
- DBG("put_subbuffer: received error, we were pushed");
- retval = PUT_SUBBUF_PUSHED;
- goto end_rep;
+ pidfp = fopen(file_name, "w");
+ if(!pidfp) {
+ PERROR("fopen (%s)", file_name);
+ WARN("killing child process");
+ return -1;
}
-end_rep:
- if(rep_code)
- free(rep_code);
-
-end:
- if(send_msg)
- free(send_msg);
- if(received_msg)
- free(received_msg);
+ fprintf(pidfp, "%d\n", pid);
- return retval;
-}
+ fclose(pidfp);
-void decrement_active_buffers(void *arg)
-{
- pthread_mutex_lock(&active_buffers_mutex);
- active_buffers--;
- pthread_mutex_unlock(&active_buffers_mutex);
+ return 0;
}
int create_dir_if_needed(char *dir)
return 0;
}
-int is_directory(const char *dir)
+int unwrite_last_subbuffer(struct buffer_info *buf)
{
int result;
- struct stat st;
+ struct buffer_info_local *buf_local = buf->user_data;
- result = stat(dir, &st);
+ result = ftruncate(buf_local->file_fd, buf_local->previous_offset);
if(result == -1) {
- PERROR("stat");
- return 0;
+ PERROR("ftruncate");
+ return -1;
}
- if(!S_ISDIR(st.st_mode)) {
- return 0;
+ result = lseek(buf_local->file_fd, buf_local->previous_offset, SEEK_SET);
+ if(result == (int)(off_t)-1) {
+ PERROR("lseek");
+ return -1;
}
- return 1;
+ return 0;
}
-struct buffer_info *connect_buffer(pid_t pid, const char *bufname)
+int write_current_subbuffer(struct buffer_info *buf)
{
- struct buffer_info *buf;
- char *send_msg;
- char *received_msg;
int result;
- char *tmp;
- int fd;
- struct shmid_ds shmds;
+ struct buffer_info_local *buf_local = buf->user_data;
- buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
- if(buf == NULL) {
- ERR("add_buffer: insufficient memory");
- return NULL;
- }
+ void *subbuf_mem = buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1));
- buf->name = bufname;
- buf->pid = pid;
+ size_t cur_sb_size = subbuffer_data_size(subbuf_mem);
- /* connect to app */
- result = ustcomm_connect_app(buf->pid, &buf->conn);
- if(result) {
- WARN("unable to connect to process, it probably died before we were able to connect");
- return NULL;
+ off_t cur_offset = lseek(buf_local->file_fd, 0, SEEK_CUR);
+ if(cur_offset == (off_t)-1) {
+ PERROR("lseek");
+ return -1;
}
- /* get pidunique */
- asprintf(&send_msg, "get_pidunique");
- result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
- free(send_msg);
+ buf_local->previous_offset = cur_offset;
+ DBG("previous_offset: %ld", cur_offset);
+
+ result = patient_write(buf_local->file_fd, subbuf_mem, cur_sb_size);
if(result == -1) {
- ERR("problem in ustcomm_send_request(get_pidunique)");
- return NULL;
+ PERROR("write");
+ return -1;
}
- result = sscanf(received_msg, "%lld", &buf->pidunique);
- if(result != 1) {
- ERR("unable to parse response to get_pidunique");
- return NULL;
- }
- free(received_msg);
- DBG("got pidunique %lld", buf->pidunique);
+ return 0;
+}
- /* get shmid */
- asprintf(&send_msg, "get_shmid %s", buf->name);
- result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
- free(send_msg);
- if(result == -1) {
- ERR("problem in ustcomm_send_request(get_shmid)");
- return NULL;
- }
+int on_read_subbuffer(struct libustd_callbacks *data, struct buffer_info *buf)
+{
+ return write_current_subbuffer(buf);
+}
- result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
- if(result != 2) {
- ERR("unable to parse response to get_shmid");
- return NULL;
- }
- free(received_msg);
- DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
+int on_read_partial_subbuffer(struct libustd_callbacks *data, struct buffer_info *buf,
+ long subbuf_index, unsigned long valid_length)
+{
+ struct buffer_info_local *buf_local = buf->user_data;
+ char *tmp;
+ int result;
+ unsigned long pad_size;
- /* get n_subbufs */
- asprintf(&send_msg, "get_n_subbufs %s", buf->name);
- result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
- free(send_msg);
+ result = patient_write(buf_local->file_fd, buf->mem + subbuf_index * buf->subbuf_size, valid_length);
if(result == -1) {
- ERR("problem in ustcomm_send_request(g_n_subbufs)");
- return NULL;
+ ERR("Error writing to buffer file");
+ return;
}
- result = sscanf(received_msg, "%d", &buf->n_subbufs);
- if(result != 1) {
- ERR("unable to parse response to get_n_subbufs");
- return NULL;
+ /* pad with empty bytes */
+ pad_size = PAGE_ALIGN(valid_length)-valid_length;
+ if(pad_size) {
+ tmp = zmalloc(pad_size);
+ result = patient_write(buf_local->file_fd, tmp, pad_size);
+ if(result == -1) {
+ ERR("Error writing to buffer file");
+ return;
+ }
+ free(tmp);
}
- free(received_msg);
- DBG("got n_subbufs %d", buf->n_subbufs);
-
- /* get subbuf size */
- asprintf(&send_msg, "get_subbuf_size %s", buf->name);
- ustcomm_send_request(&buf->conn, send_msg, &received_msg);
- free(send_msg);
- result = sscanf(received_msg, "%d", &buf->subbuf_size);
- if(result != 1) {
- ERR("unable to parse response to get_subbuf_size");
- return NULL;
- }
- free(received_msg);
- DBG("got subbuf_size %d", buf->subbuf_size);
+}
- /* attach memory */
- buf->mem = shmat(buf->shmid, NULL, 0);
- if(buf->mem == (void *) 0) {
- PERROR("shmat");
- return NULL;
- }
- DBG("successfully attached buffer memory");
+int on_open_buffer(struct libustd_callbacks *data, struct buffer_info *buf)
+{
+ char *tmp;
+ int result;
+ int fd;
+ struct buffer_info_local *buf_local =
+ zmalloc(sizeof(struct buffer_info_local));
- buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
- if(buf->bufstruct_mem == (void *) 0) {
- PERROR("shmat");
- return NULL;
+ if(!buf_local) {
+ ERR("could not allocate buffer_info_local struct");
+ return 1;
}
- DBG("successfully attached buffer bufstruct memory");
- /* obtain info on the memory segment */
- result = shmctl(buf->shmid, IPC_STAT, &shmds);
- if(result == -1) {
- PERROR("shmctl");
- return NULL;
- }
- buf->memlen = shmds.shm_segsz;
+ buf->user_data = buf_local;
/* open file for output */
if(!trace_path) {
result = create_dir_if_needed(USTD_DEFAULT_TRACE_PATH);
if(result == -1) {
ERR("could not create directory %s", USTD_DEFAULT_TRACE_PATH);
- return NULL;
+ return 1;
}
trace_path = USTD_DEFAULT_TRACE_PATH;
}
- asprintf(&tmp, "%s/%u_%lld", trace_path, buf->pid, buf->pidunique);
+ if (asprintf(&tmp, "%s/%u_%lld", trace_path, buf->pid, buf->pidunique) < 0) {
+ ERR("on_open_buffer : asprintf failed (%s/%u_%lld)",
+ trace_path, buf->pid, buf->pidunique);
+ return 1;
+ }
result = create_dir_if_needed(tmp);
if(result == -1) {
ERR("could not create directory %s", tmp);
free(tmp);
- return NULL;
+ return 1;
}
free(tmp);
- asprintf(&tmp, "%s/%u_%lld/%s_0", trace_path, buf->pid, buf->pidunique, buf->name);
+ if (asprintf(&tmp, "%s/%u_%lld/%s", trace_path, buf->pid, buf->pidunique, buf->name) < 0) {
+ ERR("on_open_buffer : asprintf failed (%s/%u_%lld/%s)",
+ trace_path, buf->pid, buf->pidunique, buf->name);
+ return 1;
+ }
result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC | O_EXCL, 00600);
if(result == -1) {
PERROR("open");
ERR("failed opening trace file %s", tmp);
- return NULL;
+ return 1;
}
- buf->file_fd = fd;
+ buf_local->file_fd = fd;
free(tmp);
- pthread_mutex_lock(&active_buffers_mutex);
- active_buffers++;
- pthread_mutex_unlock(&active_buffers_mutex);
-
- return buf;
+ return 0;
}
-int consumer_loop(struct buffer_info *buf)
+int on_close_buffer(struct libustd_callbacks *data, struct buffer_info *buf)
{
- int result;
-
- pthread_cleanup_push(decrement_active_buffers, NULL);
-
- for(;;) {
- /* get the subbuffer */
- result = get_subbuffer(buf);
- if(result == -1) {
- ERR("error getting subbuffer");
- continue;
- }
- else if(result == GET_SUBBUF_DONE) {
- /* this is done */
- break;
- }
- else if(result == GET_SUBBUF_DIED) {
- finish_consuming_dead_subbuffer(buf);
- break;
- }
-
- /* write data to file */
- result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
- if(result == -1) {
- PERROR("write");
- /* FIXME: maybe drop this trace */
- }
-
- /* put the subbuffer */
- /* FIXME: we actually should unput the buffer before consuming... */
- result = put_subbuffer(buf);
- if(result == -1) {
- ERR("unknown error putting subbuffer (channel=%s)", buf->name);
- break;
- }
- else if(result == PUT_SUBBUF_PUSHED) {
- ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
- break;
- }
- else if(result == PUT_SUBBUF_DIED) {
- WARN("application died while putting subbuffer");
- /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */
- finish_consuming_dead_subbuffer(buf);
- break;
- }
- else if(result == PUT_SUBBUF_OK) {
- }
+ struct buffer_info_local *buf_local = buf->user_data;
+ int result = close(buf_local->file_fd);
+ free(buf_local);
+ if(result == -1) {
+ PERROR("close");
}
-
- DBG("thread for buffer %s is stopping", buf->name);
-
- /* FIXME: destroy, unalloc... */
-
- pthread_cleanup_pop(1);
-
return 0;
}
-void free_buffer(struct buffer_info *buf)
+int on_put_error(struct libustd_callbacks *data, struct buffer_info *buf)
{
+ unwrite_last_subbuffer(buf);
}
-struct consumer_thread_args {
- pid_t pid;
- const char *bufname;
-};
-
-void *consumer_thread(void *arg)
+struct libustd_callbacks *new_callbacks()
{
- struct buffer_info *buf = (struct buffer_info *) arg;
- struct consumer_thread_args *args = (struct consumer_thread_args *) arg;
-
- DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname);
+ struct libustd_callbacks *callbacks =
+ zmalloc(sizeof(struct libustd_callbacks));
- buf = connect_buffer(args->pid, args->bufname);
- if(buf == NULL) {
- ERR("failed to connect to buffer");
- goto end;
- }
+ if(!callbacks)
+ return NULL;
- consumer_loop(buf);
+ callbacks->on_open_buffer = on_open_buffer;
+ callbacks->on_close_buffer = on_close_buffer;
+ callbacks->on_read_subbuffer = on_read_subbuffer;
+ callbacks->on_read_partial_subbuffer = on_read_partial_subbuffer;
+ callbacks->on_put_error = on_put_error;
+ callbacks->on_new_thread = NULL;
+ callbacks->on_close_thread = NULL;
+ callbacks->on_trace_end = NULL;
- free_buffer(buf);
+ return callbacks;
- end:
- /* bufname is free'd in free_buffer() */
- free(args);
- return NULL;
}
-int start_consuming_buffer(pid_t pid, const char *bufname)
+int is_directory(const char *dir)
{
- pthread_t thr;
- struct consumer_thread_args *args;
-
- DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname);
-
- args = (struct consumer_thread_args *) malloc(sizeof(struct consumer_thread_args));
+ int result;
+ struct stat st;
- args->pid = pid;
- args->bufname = strdup(bufname);
- DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
+ result = stat(dir, &st);
+ if(result == -1) {
+ PERROR("stat");
+ return 0;
+ }
- pthread_create(&thr, NULL, consumer_thread, args);
- DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
+ if(!S_ISDIR(st.st_mode)) {
+ return 0;
+ }
- return 0;
+ return 1;
}
void usage(void)
void sigterm_handler(int sig)
{
- terminate_req = 1;
+ libustd_stop_instance(instance, 0);
}
int start_ustd(int fd)
{
- struct ustcomm_ustd ustd;
int result;
sigset_t sigset;
struct sigaction sa;
+ struct libustd_callbacks *callbacks = new_callbacks();
+ if(!callbacks) {
+ PERROR("new_callbacks");
+ return 1;
+ }
+
result = sigemptyset(&sigset);
if(result == -1) {
PERROR("sigemptyset");
}
sa.sa_handler = sigterm_handler;
sa.sa_mask = sigset;
- sa.sa_flags = SA_RESTART;
+ sa.sa_flags = 0;
result = sigaction(SIGTERM, &sa, NULL);
if(result == -1) {
PERROR("sigaction");
return 1;
}
-
- result = ustcomm_init_ustd(&ustd, sock_path);
+ result = sigaction(SIGINT, &sa, NULL);
if(result == -1) {
- ERR("failed to initialize socket");
+ PERROR("sigaction");
+ return 1;
+ }
+
+ instance = libustd_new_instance(callbacks, sock_path);
+ if(!instance) {
+ ERR("failed to create libustd instance");
+ return 1;
+ }
+
+ result = libustd_init_instance(instance);
+ if(result) {
+ ERR("failed to initialize libustd instance");
return 1;
}
return 1;
}
+ /* Write pidfile */
+ if(pidfile) {
+ result = write_pidfile(pidfile, getpid());
+ if(result == -1) {
+ ERR("failed to write pidfile");
+ return 1;
+ }
+ }
+
/* Notify parent that we are successfully started. */
if(fd != -1) {
/* write any one character */
}
}
- /* app loop */
- for(;;) {
- char *recvbuf;
-
- /* check for requests on our public socket */
- result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
- if(result == -1) {
- ERR("error in ustcomm_ustd_recv_message");
- goto loop_end;
- }
- if(result > 0) {
- if(!strncmp(recvbuf, "collect", 7)) {
- pid_t pid;
- char *bufname;
- int result;
-
- result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
- if(result != 2) {
- ERR("parsing error: %s", recvbuf);
- goto free_bufname;
- }
-
- result = start_consuming_buffer(pid, bufname);
- if(result < 0) {
- ERR("error in add_buffer");
- goto free_bufname;
- }
-
- free_bufname:
- free(bufname);
- }
-
- free(recvbuf);
- }
-
- loop_end:
+ libustd_start_instance(instance);
- if(terminate_req) {
- pthread_mutex_lock(&active_buffers_mutex);
- if(active_buffers == 0) {
- pthread_mutex_unlock(&active_buffers_mutex);
- break;
- }
- pthread_mutex_unlock(&active_buffers_mutex);
- }
- }
+ free(callbacks);
return 0;
}
}
else {
char buf;
- FILE *pidfp;
-
- /* It's important to write the file *before*
- * the parent ends, because the file may be
- * read as soon as the parent ends.
- */
- if(pidfile) {
- pidfp = fopen(pidfile, "w+");
- if(!pidfp) {
- PERROR("fopen (%s)", pidfile);
- WARN("killing child process");
- result = kill(child_pid, SIGTERM);
- if(result == -1) {
- PERROR("kill");
- }
- return -1;
- }
-
- fprintf(pidfp, "%d\n", child_pid);
- fclose(pidfp);
- }
result = read(fd[0], &buf, 1);
if(result == -1) {