ust: first try at blocking support for consumer
[ust.git] / libtracing / relay.c
index 16f5322e1b7b7f42c7268c3374782ce7e3ea6926..0133f3e3bb5cc8ae57073fd6b6d541b96c1cfc24 100644 (file)
@@ -97,17 +97,16 @@ static int relay_alloc_buf(struct rchan_buf *buf, size_t *size)
 
        void *ptr;
        int result;
-       int shmid;
 
        *size = PAGE_ALIGN(*size);
 
-       result = shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700);
-       if(shmid == -1) {
+       result = buf->shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700);
+       if(buf->shmid == -1) {
                PERROR("shmget");
                return -1;
        }
 
-       ptr = shmat(shmid, NULL, 0);
+       ptr = shmat(buf->shmid, NULL, 0);
        if(ptr == (void *) -1) {
                perror("shmat");
                goto destroy_shmem;
@@ -116,7 +115,7 @@ static int relay_alloc_buf(struct rchan_buf *buf, size_t *size)
        /* Already mark the shared memory for destruction. This will occur only
          * when all users have detached.
         */
-       result = shmctl(shmid, IPC_RMID, NULL);
+       result = shmctl(buf->shmid, IPC_RMID, NULL);
        if(result == -1) {
                perror("shmctl");
                return -1;
@@ -128,7 +127,7 @@ static int relay_alloc_buf(struct rchan_buf *buf, size_t *size)
        return 0;
 
        destroy_shmem:
-       result = shmctl(shmid, IPC_RMID, NULL);
+       result = shmctl(buf->shmid, IPC_RMID, NULL);
        if(result == -1) {
                perror("shmctl");
        }
@@ -874,6 +873,19 @@ static notrace void ltt_buffer_end_callback(struct rchan_buf *buf,
 
 }
 
+void (*wake_consumer)(void *, int) = NULL;
+
+void relay_set_wake_consumer(void (*wake)(void *, int))
+{
+       wake_consumer = wake;
+}
+
+void relay_wake_consumer(void *arg, int finished)
+{
+       if(wake_consumer)
+               wake_consumer(arg, finished);
+}
+
 static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx,
                void *subbuf)
 {
@@ -881,7 +893,9 @@ static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx,
                (struct ltt_channel_struct *)buf->chan->private_data;
        struct ltt_channel_buf_struct *ltt_buf = channel->buf;
 
-       atomic_set(&ltt_buf->wakeup_readers, 1);
+       if(ltt_buf->call_wake_consumer)
+               relay_wake_consumer(ACCESS_ONCE(ltt_buf->wake_consumer_arg), 0);
+//ust//        atomic_set(&ltt_buf->wakeup_readers, 1);
 }
 
 static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf)
@@ -1085,14 +1099,14 @@ int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_
        consumed_old = consumed_old | uconsumed_old;
        consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
 
-       spin_lock(&ltt_buf->full_lock);
+//ust//        spin_lock(&ltt_buf->full_lock);
        if (atomic_long_cmpxchg(&ltt_buf->consumed, consumed_old,
                                consumed_new)
            != consumed_old) {
                /* We have been pushed by the writer : the last
                 * buffer read _is_ corrupted! It can also
                 * happen if this is a buffer we never got. */
-               spin_unlock(&ltt_buf->full_lock);
+//ust//                spin_unlock(&ltt_buf->full_lock);
                return -EIO;
        } else {
                /* tell the client that buffer is now unfull */
@@ -1101,7 +1115,7 @@ int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_
                index = SUBBUF_INDEX(consumed_old, buf->chan);
                data = BUFFER_OFFSET(consumed_old, buf->chan);
                ltt_buf_unfull(buf, index, data);
-               spin_unlock(&ltt_buf->full_lock);
+//ust//                spin_unlock(&ltt_buf->full_lock);
        }
        return 0;
 }
@@ -1456,8 +1470,8 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
        for (j = 0; j < n_subbufs; j++)
                local_set(&ltt_buf->commit_count[j], 0);
 //ust//        init_waitqueue_head(&ltt_buf->write_wait);
-       atomic_set(&ltt_buf->wakeup_readers, 0);
-       spin_lock_init(&ltt_buf->full_lock);
+//ust//        atomic_set(&ltt_buf->wakeup_readers, 0);
+//ust//        spin_lock_init(&ltt_buf->full_lock);
 
        ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
 
@@ -1466,6 +1480,9 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
        local_set(&ltt_buf->events_lost, 0);
        local_set(&ltt_buf->corrupted_subbuffers, 0);
 
+       ltt_buf->call_wake_consumer = 0;
+       ltt_buf->wake_consumer_arg = NULL;
+
        return 0;
 }
 
@@ -1573,8 +1590,14 @@ static int ltt_relay_create_dirs(struct ltt_trace_struct *new_trace)
  */
 static notrace void ltt_relay_buffer_flush(struct rchan_buf *buf)
 {
+       struct ltt_channel_struct *channel =
+               (struct ltt_channel_struct *)buf->chan->private_data;
+       struct ltt_channel_buf_struct *ltt_buf = channel->buf;
+
        buf->finalized = 1;
        ltt_force_switch(buf, FORCE_FLUSH);
+
+       relay_wake_consumer(ltt_buf, 1);
 }
 
 static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel)
This page took 0.027984 seconds and 4 git commands to generate.