X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=ustd%2Fustd.c;h=8205ed2b2b5e3c77cc6fa9ecae56e2c2ef7ed0ae;hb=3a7b90de71f2a82f73f06fb14a7b77805aea1064;hp=19981191e2d9d17be606d9358570118a6e36cc11;hpb=46ef48cdf8b64608a4f679500bc34293b9f0b649;p=ust.git diff --git a/ustd/ustd.c b/ustd/ustd.c index 1998119..8205ed2 100644 --- a/ustd/ustd.c +++ b/ustd/ustd.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -32,86 +33,10 @@ struct buffer_info { long consumed_old; }; -int add_buffer(pid_t pid, char *bufname) -{ - struct buffer_info *buf; - char *send_msg; - char *received_msg; - int result; - char *tmp; - int fd; - - buf = (struct buffer_info *) malloc(sizeof(struct buffer_info)); - if(buf == NULL) { - ERR("add_buffer: insufficient memory"); - return -1; - } - - buf->name = bufname; - buf->pid = pid; - - /* get shmid */ - asprintf(&send_msg, "get_shmid %s", buf->name); - send_message(pid, send_msg, &received_msg); - free(send_msg); - DBG("got buffer name %s", buf->name); - - result = sscanf(received_msg, "%d", &buf->shmid); - if(result != 1) { - ERR("unable to parse response to get_shmid"); - return -1; - } - free(received_msg); - DBG("got shmid %d", buf->shmid); - - /* get n_subbufs */ - asprintf(&send_msg, "get_n_subbufs %s", buf->name); - send_message(pid, send_msg, &received_msg); - free(send_msg); - - result = sscanf(received_msg, "%d", &buf->n_subbufs); - if(result != 1) { - ERR("unable to parse response to get_n_subbufs"); - return -1; - } - free(received_msg); - DBG("got n_subbufs %d", buf->n_subbufs); - - /* get subbuf size */ - asprintf(&send_msg, "get_subbuf_size %s", buf->name); - send_message(pid, send_msg, &received_msg); - free(send_msg); - - result = sscanf(received_msg, "%d", &buf->subbuf_size); - if(result != 1) { - ERR("unable to parse response to get_subbuf_size"); - return -1; - } - free(received_msg); - DBG("got subbuf_size %d", buf->subbuf_size); - - /* attach memory */ - buf->mem = shmat(buf->shmid, NULL, 0); - if(buf->mem == (void *) 0) { - perror("shmat"); - return -1; - } - DBG("successfully attached memory"); - - /* open file for output */ - asprintf(&tmp, "/tmp/trace/%s_0", buf->name); - result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600); - if(result == -1) { - PERROR("open"); - return -1; - } - buf->file_fd = fd; - free(tmp); - - list_add(&buf->list, &buffers); - - return 0; -} +/* return value: 0 = subbuffer is finished, it won't produce data anymore + * 1 = got subbuffer successfully + * <0 = error + */ int get_subbuffer(struct buffer_info *buf) { @@ -130,21 +55,27 @@ int get_subbuffer(struct buffer_info *buf) free(send_msg); result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old); - if(result != 2) { + if(result != 2 && result != 1) { ERR("unable to parse response to get_subbuffer"); return -1; } - free(received_msg); + + DBG("received msg is %s", received_msg); if(!strcmp(rep_code, "OK")) { DBG("got subbuffer %s", buf->name); retval = 1; } + else if(nth_token_is(received_msg, "END", 0) == 1) { + return 0; + } else { - DBG("did not get subbuffer %s", buf->name); - retval = 0; + DBG("error getting subbuffer %s", buf->name); + retval = -1; } + /* FIMXE: free correctly the stuff */ + free(received_msg); free(rep_code); return retval; } @@ -205,6 +136,125 @@ ssize_t patient_write(int fd, const void *buf, size_t count) return bufc-(const char *)buf; } +void *consumer_thread(void *arg) +{ + struct buffer_info *buf = (struct buffer_info *) arg; + int result; + + for(;;) { + result = get_subbuffer(buf); + if(result == -1) { + ERR("error getting subbuffer"); + continue; + } + if(result == 0) { + /* this is done */ + break; + } + + /* write data to file */ + result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size); + if(result == -1) { + PERROR("write"); + /* FIXME: maybe drop this trace */ + } + + result = put_subbuffer(buf); + if(result == -1) { + ERR("error putting subbuffer"); + break; + } + } + + DBG("thread for buffer %s is stopping", buf->name); + + return NULL; +} + +int add_buffer(pid_t pid, char *bufname) +{ + struct buffer_info *buf; + char *send_msg; + char *received_msg; + int result; + char *tmp; + int fd; + pthread_t thr; + + buf = (struct buffer_info *) malloc(sizeof(struct buffer_info)); + if(buf == NULL) { + ERR("add_buffer: insufficient memory"); + return -1; + } + + buf->name = bufname; + buf->pid = pid; + + /* get shmid */ + asprintf(&send_msg, "get_shmid %s", buf->name); + send_message(pid, send_msg, &received_msg); + free(send_msg); + DBG("got buffer name %s", buf->name); + + result = sscanf(received_msg, "%d", &buf->shmid); + if(result != 1) { + ERR("unable to parse response to get_shmid"); + return -1; + } + free(received_msg); + DBG("got shmid %d", buf->shmid); + + /* get n_subbufs */ + asprintf(&send_msg, "get_n_subbufs %s", buf->name); + send_message(pid, send_msg, &received_msg); + free(send_msg); + + result = sscanf(received_msg, "%d", &buf->n_subbufs); + if(result != 1) { + ERR("unable to parse response to get_n_subbufs"); + return -1; + } + free(received_msg); + DBG("got n_subbufs %d", buf->n_subbufs); + + /* get subbuf size */ + asprintf(&send_msg, "get_subbuf_size %s", buf->name); + send_message(pid, send_msg, &received_msg); + free(send_msg); + + result = sscanf(received_msg, "%d", &buf->subbuf_size); + if(result != 1) { + ERR("unable to parse response to get_subbuf_size"); + return -1; + } + free(received_msg); + DBG("got subbuf_size %d", buf->subbuf_size); + + /* attach memory */ + buf->mem = shmat(buf->shmid, NULL, 0); + if(buf->mem == (void *) 0) { + perror("shmat"); + return -1; + } + DBG("successfully attached memory"); + + /* open file for output */ + asprintf(&tmp, "/tmp/trace/%s_0", buf->name); + result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600); + if(result == -1) { + PERROR("open"); + return -1; + } + buf->file_fd = fd; + free(tmp); + + //list_add(&buf->list, &buffers); + + pthread_create(&thr, NULL, consumer_thread, buf); + + return 0; +} + int main(int argc, char **argv) { struct ustcomm_ustd ustd; @@ -219,9 +269,8 @@ int main(int argc, char **argv) /* app loop */ for(;;) { char *recvbuf; - struct buffer_info *buf; - /* 1. check for requests on our public socket */ + /* check for requests on our public socket */ result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100); if(result == -1) { ERR("error in ustcomm_ustd_recv_message"); @@ -247,30 +296,6 @@ int main(int argc, char **argv) free(recvbuf); } - - /* 2. try to consume data from tracing apps */ - list_for_each_entry(buf, &buffers, list) { - result = get_subbuffer(buf); - if(result == -1) { - ERR("error getting subbuffer"); - continue; - } - if(result == 0) - continue; - - /* write data to file */ - //result = write(buf->file_fd, buf->, ); - result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size); - if(result == -1) { - PERROR("write"); - /* FIXME: maybe drop this trace */ - } - - result = put_subbuffer(buf); - if(result == -1) { - ERR("error putting subbuffer"); - } - } } return 0;