Update lib ring buffer for external consumer
[ust.git] / libringbuffer / ring_buffer_frontend.c
index 5d87782ee321e79d51e8164d0f80087369872b06..5e6d4df267d2c48d9d4c0bfa44b33202574dcea0 100644 (file)
@@ -249,7 +249,7 @@ static void switch_buffer_timer(unsigned long data)
        /*
         * Only flush buffers periodically if readers are active.
         */
-       if (uatomic_read(&buf->active_readers))
+       if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
                lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
 
        //TODO timers
@@ -307,7 +307,7 @@ static void read_buffer_timer(unsigned long data)
 
        CHAN_WARN_ON(chan, !buf->backend.allocated);
 
-       if (uatomic_read(&buf->active_readers)
+       if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
            && lib_ring_buffer_poll_deliver(config, buf, chan)) {
                //TODO
                //wake_up_interruptible(&buf->read_wait);
@@ -395,11 +395,13 @@ static void channel_unregister_notifiers(struct channel *chan,
        //channel_backend_unregister_notifiers(&chan->backend);
 }
 
-static void channel_free(struct channel *chan, struct shm_handle *handle)
+static void channel_free(struct channel *chan, struct shm_handle *handle,
+               int shadow)
 {
        int ret;
 
-       channel_backend_free(&chan->backend, handle);
+       if (!shadow)
+               channel_backend_free(&chan->backend, handle);
        /* chan is freed by shm teardown */
        shm_object_table_destroy(handle->table);
        free(handle);
@@ -550,9 +552,10 @@ int channel_handle_add_stream(struct shm_handle *handle,
 }
 
 static
-void channel_release(struct channel *chan, struct shm_handle *handle)
+void channel_release(struct channel *chan, struct shm_handle *handle,
+               int shadow)
 {
-       channel_free(chan, handle);
+       channel_free(chan, handle, shadow);
 }
 
 /**
@@ -566,12 +569,18 @@ void channel_release(struct channel *chan, struct shm_handle *handle)
  * They should release their handle at that point.  Returns the private
  * data pointer.
  */
-void *channel_destroy(struct channel *chan, struct shm_handle *handle)
+void *channel_destroy(struct channel *chan, struct shm_handle *handle,
+               int shadow)
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        void *priv;
        int cpu;
 
+       if (shadow) {
+               channel_release(chan, handle, shadow);
+               return NULL;
+       }
+
        channel_unregister_notifiers(chan, handle);
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
@@ -615,7 +624,7 @@ void *channel_destroy(struct channel *chan, struct shm_handle *handle)
         * descriptor directly. No need to refcount.
         */
        priv = chan->backend.priv;
-       channel_release(chan, handle);
+       channel_release(chan, handle, shadow);
        return priv;
 }
 
@@ -642,10 +651,17 @@ struct lib_ring_buffer *channel_get_ring_buffer(
 }
 
 int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
-                             struct shm_handle *handle)
+                             struct shm_handle *handle,
+                             int shadow)
 {
        struct channel *chan = shmp(handle, buf->backend.chan);
 
+       if (shadow) {
+               if (uatomic_cmpxchg(&buf->active_shadow_readers, 0, 1) != 0)
+                       return -EBUSY;
+               cmm_smp_mb();
+               return 0;
+       }
        if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
                return -EBUSY;
        cmm_smp_mb();
@@ -653,10 +669,17 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
 }
 
 void lib_ring_buffer_release_read(struct lib_ring_buffer *buf,
-                                 struct shm_handle *handle)
+                                 struct shm_handle *handle,
+                                 int shadow)
 {
        struct channel *chan = shmp(handle, buf->backend.chan);
 
+       if (shadow) {
+               CHAN_WARN_ON(chan, uatomic_read(&buf->active_shadow_readers) != 1);
+               cmm_smp_mb();
+               uatomic_dec(&buf->active_shadow_readers);
+               return;
+       }
        CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
        cmm_smp_mb();
        uatomic_dec(&buf->active_readers);
@@ -734,7 +757,8 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
        struct channel *chan = shmp(handle, bufb->chan);
        unsigned long consumed;
 
-       CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
+       CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
+                       && uatomic_read(&buf->active_shadow_readers) != 1);
 
        /*
         * Only push the consumed value forward.
@@ -857,7 +881,8 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf,
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long read_sb_bindex, consumed_idx, consumed;
 
-       CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
+       CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
+                       && uatomic_read(&buf->active_shadow_readers) != 1);
 
        if (!buf->get_subbuf) {
                /*
This page took 0.024619 seconds and 4 git commands to generate.