X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=ustd%2Fustd.c;h=8eeb62ba361aa0a7696b7540a827fb24b6cafb4e;hb=750f9da42a4d6e09b068ca959977707b20c98f36;hp=7882e34544278f4cd68a08303670e8b8d35d75ea;hpb=2ddb81a86943f0ca84b92ae4a1cf144098a9d8bd;p=ust.git diff --git a/ustd/ustd.c b/ustd/ustd.c index 7882e34..8eeb62b 100644 --- a/ustd/ustd.c +++ b/ustd/ustd.c @@ -86,11 +86,10 @@ int get_subbuffer(struct buffer_info *buf) if(result != 2 && result != 1) { ERR("unable to parse response to get_subbuffer"); retval = -1; + free(received_msg); goto end_rep; } - DBG("received msg is %s", received_msg); - if(!strcmp(rep_code, "OK")) { DBG("got subbuffer %s", buf->name); retval = GET_SUBBUF_OK; @@ -260,6 +259,9 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname) ERR("problem in ustcomm_send_request(get_pidunique)"); return NULL; } + if(result == 0) { + goto error; + } result = sscanf(received_msg, "%lld", &buf->pidunique); if(result != 1) { @@ -277,6 +279,9 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname) ERR("problem in ustcomm_send_request(get_shmid)"); return NULL; } + if(result == 0) { + goto error; + } result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid); if(result != 2) { @@ -294,6 +299,9 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname) ERR("problem in ustcomm_send_request(g_n_subbufs)"); return NULL; } + if(result == 0) { + goto error; + } result = sscanf(received_msg, "%d", &buf->n_subbufs); if(result != 1) { @@ -305,8 +313,15 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname) /* get subbuf size */ asprintf(&send_msg, "get_subbuf_size %s", buf->name); - ustcomm_send_request(&buf->conn, send_msg, &received_msg); + result = ustcomm_send_request(&buf->conn, send_msg, &received_msg); free(send_msg); + if(result == -1) { + ERR("problem in ustcomm_send_request(get_subbuf_size)"); + return NULL; + } + if(result == 0) { + goto error; + } result = sscanf(received_msg, "%d", &buf->subbuf_size); if(result != 1) { @@ -378,6 +393,56 @@ struct buffer_info *connect_buffer(pid_t pid, const char *bufname) pthread_mutex_unlock(&active_buffers_mutex); return buf; + +error: + free(buf); + return NULL; +} + +static void destroy_buffer(struct buffer_info *buf) +{ + int result; + + result = ustcomm_close_app(&buf->conn); + if(result == -1) { + WARN("problem calling ustcomm_close_app"); + } + + result = shmdt(buf->mem); + if(result == -1) { + PERROR("shmdt"); + } + + result = shmdt(buf->bufstruct_mem); + if(result == -1) { + PERROR("shmdt"); + } + + result = close(buf->file_fd); + if(result == -1) { + PERROR("close"); + } + + free(buf); +} + +int unwrite_last_subbuffer(struct buffer_info *buf) +{ + int result; + + result = ftruncate(buf->file_fd, buf->previous_offset); + if(result == -1) { + PERROR("ftruncate"); + return -1; + } + + result = lseek(buf->file_fd, buf->previous_offset, SEEK_SET); + if(result == (int)(off_t)-1) { + PERROR("lseek"); + return -1; + } + + return 0; } int write_current_subbuffer(struct buffer_info *buf) @@ -388,11 +453,20 @@ int write_current_subbuffer(struct buffer_info *buf) size_t cur_sb_size = subbuffer_data_size(subbuf_mem); + off_t cur_offset = lseek(buf->file_fd, 0, SEEK_CUR); + if(cur_offset == (off_t)-1) { + PERROR("lseek"); + return -1; + } + + buf->previous_offset = cur_offset; + DBG("previous_offset: %ld", cur_offset); + result = patient_write(buf->file_fd, subbuf_mem, cur_sb_size); if(result == -1) { PERROR("write"); /* FIXME: maybe drop this trace */ - return 0; + return -1; } return 0; @@ -425,7 +499,6 @@ int consumer_loop(struct buffer_info *buf) /* FIXME: handle return value? */ /* put the subbuffer */ - /* FIXME: we actually should unput the buffer before consuming... */ result = put_subbuffer(buf); if(result == -1) { ERR("unknown error putting subbuffer (channel=%s)", buf->name); @@ -437,7 +510,10 @@ int consumer_loop(struct buffer_info *buf) } else if(result == PUT_SUBBUF_DIED) { WARN("application died while putting subbuffer"); - /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */ + /* Skip the first subbuffer. We are not sure it is trustable + * because the put_subbuffer() did not complete. + */ + unwrite_last_subbuffer(buf); finish_consuming_dead_subbuffer(buf); break; } @@ -463,10 +539,6 @@ int consumer_loop(struct buffer_info *buf) return 0; } -void free_buffer(struct buffer_info *buf) -{ -} - struct consumer_thread_args { pid_t pid; const char *bufname; @@ -487,7 +559,8 @@ void *consumer_thread(void *arg) consumer_loop(buf); - free_buffer(buf); + free(args->bufname); + destroy_buffer(buf); end: /* bufname is free'd in free_buffer() */ @@ -499,6 +572,7 @@ int start_consuming_buffer(pid_t pid, const char *bufname) { pthread_t thr; struct consumer_thread_args *args; + int result; DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname); @@ -508,7 +582,16 @@ int start_consuming_buffer(pid_t pid, const char *bufname) args->bufname = strdup(bufname); DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname); - pthread_create(&thr, NULL, consumer_thread, args); + result = pthread_create(&thr, NULL, consumer_thread, args); + if(result == -1) { + ERR("pthread_create failed"); + return -1; + } + result = pthread_detach(thr); + if(result == -1) { + ERR("pthread_detach failed"); + return -1; + } DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname); return 0; @@ -624,6 +707,11 @@ int start_ustd(int fd) PERROR("sigaction"); return 1; } + result = sigaction(SIGINT, &sa, NULL); + if(result == -1) { + PERROR("sigaction"); + return 1; + } result = ustcomm_init_ustd(&ustd, sock_path); if(result == -1) { @@ -706,6 +794,9 @@ int start_ustd(int fd) free_bufname: free(bufname); } + else { + WARN("unknown command: %s", recvbuf); + } free(recvbuf); } @@ -722,6 +813,8 @@ int start_ustd(int fd) } } + ustcomm_fini_ustd(&ustd); + return 0; }