X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=libtracing%2Frelay.c;h=88bdfefc487d5dae3239aa38984a5a6c5150dbbe;hb=3a7b90de71f2a82f73f06fb14a7b77805aea1064;hp=14724a8e70ad0df8846f6932e465289581190eaf;hpb=bb07823d7dc174f7bd96d30843acfac7424dd0d2;p=ust.git diff --git a/libtracing/relay.c b/libtracing/relay.c index 14724a8..88bdfef 100644 --- a/libtracing/relay.c +++ b/libtracing/relay.c @@ -22,8 +22,10 @@ //ust// #include //ust// #include //ust// #include -#include #include "kernelcompat.h" +#include +#include +#include #include "list.h" #include "relay.h" #include "channels.h" @@ -36,6 +38,9 @@ static DEFINE_MUTEX(relay_channels_mutex); static LIST_HEAD(relay_channels); + +static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf); + /** * relay_alloc_buf - allocate a channel buffer * @buf: the buffer struct @@ -90,21 +95,44 @@ static int relay_alloc_buf(struct rchan_buf *buf, size_t *size) unsigned int n_pages; struct buf_page *buf_page, *n; - void *result; + void *ptr; + int result; *size = PAGE_ALIGN(*size); - /* Maybe do read-ahead */ - result = mmap(NULL, *size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS, -1, 0); - if(result == MAP_FAILED) { - PERROR("mmap"); + result = buf->shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700); + if(buf->shmid == -1) { + PERROR("shmget"); + return -1; + } + + ptr = shmat(buf->shmid, NULL, 0); + if(ptr == (void *) -1) { + perror("shmat"); + goto destroy_shmem; + } + + /* Already mark the shared memory for destruction. This will occur only + * when all users have detached. + */ + result = shmctl(buf->shmid, IPC_RMID, NULL); + if(result == -1) { + perror("shmctl"); return -1; } - buf->buf_data = result; + buf->buf_data = ptr; buf->buf_size = *size; return 0; + + destroy_shmem: + result = shmctl(buf->shmid, IPC_RMID, NULL); + if(result == -1) { + perror("shmctl"); + } + + return -1; } /** @@ -178,7 +206,7 @@ static void relay_destroy_buf(struct rchan_buf *buf) static void relay_remove_buf(struct kref *kref) { struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref); - buf->chan->cb->remove_buf_file(buf); +//ust// buf->chan->cb->remove_buf_file(buf); relay_destroy_buf(buf); } @@ -266,6 +294,9 @@ static struct rchan_buf *relay_open_buf(struct rchan *chan) /* Create file in fs */ //ust// dentry = chan->cb->create_buf_file(tmpname, chan->parent, S_IRUSR, //ust// buf); + + ltt_create_buf_file_callback(buf); // ust // + //ust// if (!dentry) //ust// goto free_buf; //ust// @@ -631,6 +662,7 @@ void *ltt_relay_offset_address(struct rchan_buf *buf, size_t offset) //ust// buf->hpage[odd] = page = buf->wpage; //ust// page = ltt_relay_cache_page(buf, &buf->hpage[odd], page, offset); //ust// return page_address(page->page) + (offset & ~PAGE_MASK); + return ((char *)buf->buf_data)+offset; return NULL; } //ust// EXPORT_SYMBOL_GPL(ltt_relay_offset_address); @@ -741,37 +773,6 @@ void *ltt_relay_offset_address(struct rchan_buf *buf, size_t offset) #define printk_dbg(fmt, args...) #endif -/* LTTng lockless logging buffer info */ -struct ltt_channel_buf_struct { - /* First 32 bytes cache-hot cacheline */ - local_t offset; /* Current offset in the buffer */ - local_t *commit_count; /* Commit count per sub-buffer */ - atomic_long_t consumed; /* - * Current offset in the buffer - * standard atomic access (shared) - */ - unsigned long last_tsc; /* - * Last timestamp written in the buffer. - */ - /* End of first 32 bytes cacheline */ - atomic_long_t active_readers; /* - * Active readers count - * standard atomic access (shared) - */ - local_t events_lost; - local_t corrupted_subbuffers; - spinlock_t full_lock; /* - * buffer full condition spinlock, only - * for userspace tracing blocking mode - * synchronization with reader. - */ -//ust// wait_queue_head_t write_wait; /* -//ust// * Wait queue for blocking user space -//ust// * writers -//ust// */ - atomic_t wakeup_readers; /* Boolean : wakeup readers waiting ? */ -} ____cacheline_aligned; - /* * Last TSC comparison functions. Check if the current TSC overflows * LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc @@ -867,8 +868,22 @@ static notrace void ltt_buffer_end_callback(struct rchan_buf *buf, header->lost_size = SUBBUF_OFFSET((buf->chan->subbuf_size - offset), buf->chan); header->cycle_count_end = tsc; - header->events_lost = ltt_buf->events_lost; - header->subbuf_corrupt = ltt_buf->corrupted_subbuffers; + header->events_lost = local_read(<t_buf->events_lost); + header->subbuf_corrupt = local_read(<t_buf->corrupted_subbuffers); + +} + +void (*wake_consumer)(void *, int) = NULL; + +void relay_set_wake_consumer(void (*wake)(void *, int)) +{ + wake_consumer = wake; +} + +void relay_wake_consumer(void *arg, int finished) +{ + if(wake_consumer) + wake_consumer(arg, finished); } static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx, @@ -877,13 +892,17 @@ static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx, struct ltt_channel_struct *channel = (struct ltt_channel_struct *)buf->chan->private_data; struct ltt_channel_buf_struct *ltt_buf = channel->buf; + int result; - atomic_set(<t_buf->wakeup_readers, 1); + result = write(ltt_buf->data_ready_fd_write, "1", 1); + if(result == -1) { + PERROR("write (in ltt_relay_buffer_flush)"); + ERR("this should never happen!"); + } +//ust// atomic_set(<t_buf->wakeup_readers, 1); } -static struct dentry *ltt_create_buf_file_callback(const char *filename, - struct dentry *parent, int mode, - struct rchan_buf *buf) +static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf) { struct ltt_channel_struct *ltt_chan; int err; @@ -899,6 +918,7 @@ static struct dentry *ltt_create_buf_file_callback(const char *filename, //ust// if (!dentry) //ust// goto error; //ust// return dentry; + return NULL; //ust// //ust//error: ltt_relay_destroy_buffer(ltt_chan); return NULL; @@ -1034,7 +1054,7 @@ static notrace void ltt_buf_unfull(struct rchan_buf *buf, //ust// return mask; //ust// } -static int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old) +int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old) { struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)buf->chan->private_data; long consumed_old, consumed_idx, commit_count, write_offset; @@ -1074,7 +1094,7 @@ static int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struc return 0; } -static int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, u32 uconsumed_old) +int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, u32 uconsumed_old) { long consumed_new, consumed_old; @@ -1083,14 +1103,14 @@ static int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struc consumed_old = consumed_old | uconsumed_old; consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan); - spin_lock(<t_buf->full_lock); +//ust// spin_lock(<t_buf->full_lock); if (atomic_long_cmpxchg(<t_buf->consumed, consumed_old, consumed_new) != consumed_old) { /* We have been pushed by the writer : the last * buffer read _is_ corrupted! It can also * happen if this is a buffer we never got. */ - spin_unlock(<t_buf->full_lock); +//ust// spin_unlock(<t_buf->full_lock); return -EIO; } else { /* tell the client that buffer is now unfull */ @@ -1099,7 +1119,7 @@ static int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struc index = SUBBUF_INDEX(consumed_old, buf->chan); data = BUFFER_OFFSET(consumed_old, buf->chan); ltt_buf_unfull(buf, index, data); - spin_unlock(<t_buf->full_lock); +//ust// spin_unlock(<t_buf->full_lock); } return 0; } @@ -1440,29 +1460,39 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace, { struct ltt_channel_buf_struct *ltt_buf = ltt_chan->buf; unsigned int j; + int fds[2]; + int result; ltt_buf->commit_count = - malloc(sizeof(ltt_buf->commit_count) * n_subbufs); + zmalloc(sizeof(ltt_buf->commit_count) * n_subbufs); if (!ltt_buf->commit_count) return -ENOMEM; kref_get(&trace->kref); kref_get(&trace->ltt_transport_kref); kref_get(<t_chan->kref); - ltt_buf->offset = ltt_subbuffer_header_size(); + local_set(<t_buf->offset, ltt_subbuffer_header_size()); atomic_long_set(<t_buf->consumed, 0); atomic_long_set(<t_buf->active_readers, 0); for (j = 0; j < n_subbufs; j++) local_set(<t_buf->commit_count[j], 0); //ust// init_waitqueue_head(<t_buf->write_wait); - atomic_set(<t_buf->wakeup_readers, 0); - spin_lock_init(<t_buf->full_lock); +//ust// atomic_set(<t_buf->wakeup_readers, 0); +//ust// spin_lock_init(<t_buf->full_lock); ltt_buffer_begin_callback(buf, trace->start_tsc, 0); - ltt_buf->commit_count[0] += ltt_subbuffer_header_size(); + local_add(ltt_subbuffer_header_size(), <t_buf->commit_count[0]); + + local_set(<t_buf->events_lost, 0); + local_set(<t_buf->corrupted_subbuffers, 0); - ltt_buf->events_lost = 0; - ltt_buf->corrupted_subbuffers = 0; + result = pipe(fds); + if(result == -1) { + PERROR("pipe"); + return -1; + } + ltt_buf->data_ready_fd_read = fds[0]; + ltt_buf->data_ready_fd_write = fds[1]; return 0; } @@ -1571,8 +1601,19 @@ static int ltt_relay_create_dirs(struct ltt_trace_struct *new_trace) */ static notrace void ltt_relay_buffer_flush(struct rchan_buf *buf) { + struct ltt_channel_struct *channel = + (struct ltt_channel_struct *)buf->chan->private_data; + struct ltt_channel_buf_struct *ltt_buf = channel->buf; + int result; + buf->finalized = 1; ltt_force_switch(buf, FORCE_FLUSH); + + result = write(ltt_buf->data_ready_fd_write, "1", 1); + if(result == -1) { + PERROR("write (in ltt_relay_buffer_flush)"); + ERR("this should never happen!"); + } } static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel) @@ -1594,11 +1635,20 @@ static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel) static void ltt_relay_finish_buffer(struct ltt_channel_struct *ltt_channel) { struct rchan *rchan = ltt_channel->trans_channel_data; + int result; if (rchan->buf) { struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf; ltt_relay_buffer_flush(rchan->buf); //ust// ltt_relay_wake_writers(ltt_buf); + /* closing the pipe tells the consumer the buffer is finished */ + + //result = write(ltt_buf->data_ready_fd_write, "D", 1); + //if(result == -1) { + // PERROR("write (in ltt_relay_finish_buffer)"); + // ERR("this should never happen!"); + //} + close(ltt_buf->data_ready_fd_write); } } @@ -2010,7 +2060,7 @@ static inline void ltt_reserve_end_switch_current( static notrace int ltt_relay_reserve_slot(struct ltt_trace_struct *trace, struct ltt_channel_struct *ltt_channel, void **transport_data, size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc, - unsigned int *rflags, int largest_align, int cpu) + unsigned int *rflags, int largest_align) { struct rchan *rchan = ltt_channel->trans_channel_data; struct rchan_buf *buf = *transport_data = rchan->buf; @@ -2283,7 +2333,7 @@ static int ltt_relay_user_blocking(struct ltt_trace_struct *trace, static void ltt_relay_print_user_errors(struct ltt_trace_struct *trace, unsigned int chan_index, size_t data_size, - struct user_dbg_data *dbg, int cpu) + struct user_dbg_data *dbg) { struct rchan *rchan; struct ltt_channel_buf_struct *ltt_buf; @@ -2368,9 +2418,14 @@ static struct ltt_transport ust_relay_transport = { //ust// return 0; //ust// } -void init_ustrelay_transport(void) +static char initialized = 0; + +void __attribute__((constructor)) init_ustrelay_transport(void) { - ltt_transport_register(&ust_relay_transport); + if(!initialized) { + ltt_transport_register(&ust_relay_transport); + initialized = 1; + } } static void __exit ltt_relay_exit(void)