X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=libtracing%2Frelay.c;h=88bdfefc487d5dae3239aa38984a5a6c5150dbbe;hb=3a7b90de71f2a82f73f06fb14a7b77805aea1064;hp=16f5322e1b7b7f42c7268c3374782ce7e3ea6926;hpb=1ae7f0744f280e97ab1a2adc548b8fd9f2cb21a4;p=ust.git diff --git a/libtracing/relay.c b/libtracing/relay.c index 16f5322..88bdfef 100644 --- a/libtracing/relay.c +++ b/libtracing/relay.c @@ -97,17 +97,16 @@ static int relay_alloc_buf(struct rchan_buf *buf, size_t *size) void *ptr; int result; - int shmid; *size = PAGE_ALIGN(*size); - result = shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700); - if(shmid == -1) { + result = buf->shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700); + if(buf->shmid == -1) { PERROR("shmget"); return -1; } - ptr = shmat(shmid, NULL, 0); + ptr = shmat(buf->shmid, NULL, 0); if(ptr == (void *) -1) { perror("shmat"); goto destroy_shmem; @@ -116,7 +115,7 @@ static int relay_alloc_buf(struct rchan_buf *buf, size_t *size) /* Already mark the shared memory for destruction. This will occur only * when all users have detached. */ - result = shmctl(shmid, IPC_RMID, NULL); + result = shmctl(buf->shmid, IPC_RMID, NULL); if(result == -1) { perror("shmctl"); return -1; @@ -128,7 +127,7 @@ static int relay_alloc_buf(struct rchan_buf *buf, size_t *size) return 0; destroy_shmem: - result = shmctl(shmid, IPC_RMID, NULL); + result = shmctl(buf->shmid, IPC_RMID, NULL); if(result == -1) { perror("shmctl"); } @@ -874,14 +873,33 @@ static notrace void ltt_buffer_end_callback(struct rchan_buf *buf, } +void (*wake_consumer)(void *, int) = NULL; + +void relay_set_wake_consumer(void (*wake)(void *, int)) +{ + wake_consumer = wake; +} + +void relay_wake_consumer(void *arg, int finished) +{ + if(wake_consumer) + wake_consumer(arg, finished); +} + static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx, void *subbuf) { struct ltt_channel_struct *channel = (struct ltt_channel_struct *)buf->chan->private_data; struct ltt_channel_buf_struct *ltt_buf = channel->buf; + int result; - atomic_set(<t_buf->wakeup_readers, 1); + result = write(ltt_buf->data_ready_fd_write, "1", 1); + if(result == -1) { + PERROR("write (in ltt_relay_buffer_flush)"); + ERR("this should never happen!"); + } +//ust// atomic_set(<t_buf->wakeup_readers, 1); } static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf) @@ -1085,14 +1103,14 @@ int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_ consumed_old = consumed_old | uconsumed_old; consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan); - spin_lock(<t_buf->full_lock); +//ust// spin_lock(<t_buf->full_lock); if (atomic_long_cmpxchg(<t_buf->consumed, consumed_old, consumed_new) != consumed_old) { /* We have been pushed by the writer : the last * buffer read _is_ corrupted! It can also * happen if this is a buffer we never got. */ - spin_unlock(<t_buf->full_lock); +//ust// spin_unlock(<t_buf->full_lock); return -EIO; } else { /* tell the client that buffer is now unfull */ @@ -1101,7 +1119,7 @@ int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_ index = SUBBUF_INDEX(consumed_old, buf->chan); data = BUFFER_OFFSET(consumed_old, buf->chan); ltt_buf_unfull(buf, index, data); - spin_unlock(<t_buf->full_lock); +//ust// spin_unlock(<t_buf->full_lock); } return 0; } @@ -1442,6 +1460,8 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace, { struct ltt_channel_buf_struct *ltt_buf = ltt_chan->buf; unsigned int j; + int fds[2]; + int result; ltt_buf->commit_count = zmalloc(sizeof(ltt_buf->commit_count) * n_subbufs); @@ -1456,8 +1476,8 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace, for (j = 0; j < n_subbufs; j++) local_set(<t_buf->commit_count[j], 0); //ust// init_waitqueue_head(<t_buf->write_wait); - atomic_set(<t_buf->wakeup_readers, 0); - spin_lock_init(<t_buf->full_lock); +//ust// atomic_set(<t_buf->wakeup_readers, 0); +//ust// spin_lock_init(<t_buf->full_lock); ltt_buffer_begin_callback(buf, trace->start_tsc, 0); @@ -1466,6 +1486,14 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace, local_set(<t_buf->events_lost, 0); local_set(<t_buf->corrupted_subbuffers, 0); + result = pipe(fds); + if(result == -1) { + PERROR("pipe"); + return -1; + } + ltt_buf->data_ready_fd_read = fds[0]; + ltt_buf->data_ready_fd_write = fds[1]; + return 0; } @@ -1573,8 +1601,19 @@ static int ltt_relay_create_dirs(struct ltt_trace_struct *new_trace) */ static notrace void ltt_relay_buffer_flush(struct rchan_buf *buf) { + struct ltt_channel_struct *channel = + (struct ltt_channel_struct *)buf->chan->private_data; + struct ltt_channel_buf_struct *ltt_buf = channel->buf; + int result; + buf->finalized = 1; ltt_force_switch(buf, FORCE_FLUSH); + + result = write(ltt_buf->data_ready_fd_write, "1", 1); + if(result == -1) { + PERROR("write (in ltt_relay_buffer_flush)"); + ERR("this should never happen!"); + } } static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel) @@ -1596,11 +1635,20 @@ static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel) static void ltt_relay_finish_buffer(struct ltt_channel_struct *ltt_channel) { struct rchan *rchan = ltt_channel->trans_channel_data; + int result; if (rchan->buf) { struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf; ltt_relay_buffer_flush(rchan->buf); //ust// ltt_relay_wake_writers(ltt_buf); + /* closing the pipe tells the consumer the buffer is finished */ + + //result = write(ltt_buf->data_ready_fd_write, "D", 1); + //if(result == -1) { + // PERROR("write (in ltt_relay_finish_buffer)"); + // ERR("this should never happen!"); + //} + close(ltt_buf->data_ready_fd_write); } }