cds containers: lfqueue and lfstack: don't depend on a particular rcu flavor
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 10 Jun 2011 00:31:29 +0000 (20:31 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 10 Jun 2011 00:31:29 +0000 (20:31 -0400)
Remove rcu_read lock/unlock from the code, require the caller to ensure
protection. First move towards a single .so for all data containers.

This changes the lfqueue API, which is not very much used yet AFAIK.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
rculfqueue.c
rculfstack.c
tests/test_urcu_lfq.c
tests/test_urcu_lfs.c
urcu/rculfqueue-static.h
urcu/rculfqueue.h
urcu/rculfstack-static.h
urcu/rculfstack.h

index 508e05c..c844525 100644 (file)
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
+#define _LGPL_SOURCE
 /* Use the urcu symbols to select the appropriate rcu flavor at link time */
 #include "urcu.h"
+
+#undef _LGPL_SOURCE
 /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
 #include "urcu/rculfqueue.h"
 #include "urcu/rculfqueue-static.h"
@@ -35,9 +38,10 @@ void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
        _cds_lfq_node_init_rcu(node);
 }
 
-void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
+void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+                     void (*release)(struct urcu_ref *ref))
 {
-       _cds_lfq_init_rcu(q);
+       _cds_lfq_init_rcu(q, release);
 }
 
 void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node)
@@ -46,7 +50,7 @@ void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *n
 }
 
 struct cds_lfq_node_rcu *
-cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q, void (*release)(struct urcu_ref *))
+cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
 {
-       return _cds_lfq_dequeue_rcu(q, release);
+       return _cds_lfq_dequeue_rcu(q);
 }
index 2c26046..ca50232 100644 (file)
  */
 
 /* Use the urcu symbols to select the appropriate rcu flavor at link time */
+#define _LGPL_SOURCE
 #include "urcu.h"
+
+#undef _LGPL_SOURCE
 /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
 #include "urcu/rculfstack.h"
 #include "urcu/rculfstack-static.h"
index 7aaad99..15023f6 100644 (file)
@@ -177,7 +177,9 @@ void *thr_enqueuer(void *_count)
                if (!node)
                        goto fail;
                cds_lfq_node_init_rcu(node);
+               rcu_read_lock();
                cds_lfq_enqueue_rcu(&q, node);
+               rcu_read_unlock();
                nr_successful_enqueues++;
 
                if (unlikely(wdelay))
@@ -200,12 +202,18 @@ fail:
 
 }
 
-static void rcu_release_node(struct urcu_ref *ref)
+static void rcu_free_node(struct rcu_head *head)
 {
-       struct cds_lfq_node_rcu *node = caa_container_of(ref, struct cds_lfq_node_rcu, ref);
-       defer_rcu(free, node);
-       //synchronize_rcu();
-       //free(node);
+       struct cds_lfq_node_rcu *node =
+               caa_container_of(head, struct cds_lfq_node_rcu, rcu_head);
+       free(node);
+}
+
+static void ref_release_node(struct urcu_ref *ref)
+{
+       struct cds_lfq_node_rcu *node =
+               caa_container_of(ref, struct cds_lfq_node_rcu, ref);
+       call_rcu(&node->rcu_head, rcu_free_node);
 }
 
 void *thr_dequeuer(void *_count)
@@ -231,11 +239,14 @@ void *thr_dequeuer(void *_count)
        cmm_smp_mb();
 
        for (;;) {
-               struct cds_lfq_node_rcu *node = cds_lfq_dequeue_rcu(&q,
-                                                           rcu_release_node);
+               struct cds_lfq_node_rcu *node;
+
+               rcu_read_lock();
+               node = cds_lfq_dequeue_rcu(&q);
+               rcu_read_unlock();
 
                if (node) {
-                       urcu_ref_put(&node->ref, rcu_release_node);
+                       urcu_ref_put(&node->ref, ref_release_node);
                        nr_successful_dequeues++;
                }
 
@@ -269,7 +280,9 @@ void test_end(struct cds_lfq_queue_rcu *q, unsigned long long *nr_dequeues)
        struct cds_lfq_node_rcu *node;
 
        do {
-               node = cds_lfq_dequeue_rcu(q, release_node);
+               rcu_read_lock();
+               node = cds_lfq_dequeue_rcu(q);
+               rcu_read_unlock();
                if (node) {
                        urcu_ref_put(&node->ref, release_node);
                        (*nr_dequeues)++;
@@ -368,7 +381,7 @@ int main(int argc, char **argv)
        tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
        count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers);
        count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers);
-       cds_lfq_init_rcu(&q);
+       cds_lfq_init_rcu(&q, ref_release_node);
 
        next_aff = 0;
 
index 834c72e..71596fd 100644 (file)
@@ -177,6 +177,7 @@ void *thr_enqueuer(void *_count)
                if (!node)
                        goto fail;
                cds_lfs_node_init_rcu(node);
+               /* No rcu read-side is needed for push */
                cds_lfs_push_rcu(&s, node);
                nr_successful_enqueues++;
 
@@ -223,13 +224,15 @@ void *thr_dequeuer(void *_count)
        cmm_smp_mb();
 
        for (;;) {
-               struct cds_lfs_node_rcu *node = cds_lfs_pop_rcu(&s);
+               struct cds_lfs_node_rcu *node;
 
+               rcu_read_lock();
+               node = cds_lfs_pop_rcu(&s);
+               rcu_read_unlock();
                if (node) {
                        defer_rcu(free, node);
                        nr_successful_dequeues++;
                }
-
                nr_dequeues++;
                if (unlikely(!test_duration_dequeue()))
                        break;
index d7c359f..be6a4bc 100644 (file)
@@ -53,17 +53,24 @@ void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
        urcu_ref_init(&node->ref);
 }
 
-void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
+void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+                      void (*release)(struct urcu_ref *ref))
 {
        _cds_lfq_node_init_rcu(&q->init);
        /* Make sure the initial node is never freed. */
        urcu_ref_set(&q->init.ref, URCU_LFQ_PERMANENT_REF);
        q->head = q->tail = &q->init;
+       q->release = release;
 }
 
-void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node)
+/*
+ * Should be called under rcu read lock critical section.
+ */
+void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
+                         struct cds_lfq_node_rcu *node)
 {
        urcu_ref_get(&node->ref);
+       node->queue = q;
 
        /*
         * uatomic_cmpxchg() implicit memory barrier orders earlier stores to
@@ -73,7 +80,6 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *
        for (;;) {
                struct cds_lfq_node_rcu *tail, *next;
 
-               rcu_read_lock();
                tail = rcu_dereference(q->tail);
                /*
                 * Typically expect tail->next to be NULL.
@@ -87,7 +93,6 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *
                         * us to it, that's fine).
                         */
                        (void) uatomic_cmpxchg(&q->tail, tail, node);
-                       rcu_read_unlock();
                        return;
                } else {
                        /*
@@ -95,44 +100,39 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *
                         * further and retry.
                         */
                        (void) uatomic_cmpxchg(&q->tail, tail, next);
-                       rcu_read_unlock();
                        continue;
                }
        }
 }
 
 /*
- * The entry returned by dequeue must be taken care of by doing a urcu_ref_put,
- * which calls the release primitive when the reference count drops to zero. A
- * grace period must be waited after execution of the release callback before
- * performing the actual memory reclamation or modifying the cds_lfq_node_rcu
- * structure.
+ * Should be called under rcu read lock critical section.
+ *
+ * The entry returned by dequeue must be taken care of by doing a
+ * urcu_ref_put after a grace period passes.
+ *
  * In other words, the entry lfq node returned by dequeue must not be
  * modified/re-used/freed until the reference count reaches zero and a grace
- * period has elapsed (after the refcount reached 0).
+ * period has elapsed.
  */
 struct cds_lfq_node_rcu *
-_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q, void (*release)(struct urcu_ref *))
+_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
 {
        for (;;) {
                struct cds_lfq_node_rcu *head, *next;
 
-               rcu_read_lock();
                head = rcu_dereference(q->head);
                next = rcu_dereference(head->next);
                if (next) {
                        if (uatomic_cmpxchg(&q->head, head, next) == head) {
-                               rcu_read_unlock();
-                               urcu_ref_put(&head->ref, release);
+                               urcu_ref_put(&head->ref, q->release);
                                return next;
                        } else {
                                /* Concurrently pushed, retry */
-                               rcu_read_unlock();
                                continue;
                        }
                } else {
                        /* Empty */
-                       rcu_read_unlock();
                        return NULL;
                }
        }
index 7b3f81a..55f15f8 100644 (file)
@@ -40,14 +40,19 @@ extern "C" {
  * which point their reference count will be decremented.
  */
 
+struct cds_lfq_queue_rcu;
+
 struct cds_lfq_node_rcu {
        struct cds_lfq_node_rcu *next;
        struct urcu_ref ref;
+       struct cds_lfq_queue_rcu *queue;
+       struct rcu_head rcu_head;
 };
 
 struct cds_lfq_queue_rcu {
        struct cds_lfq_node_rcu *head, *tail;
        struct cds_lfq_node_rcu init;   /* Dummy initialization node */
+       void (*release)(struct urcu_ref *ref);
 };
 
 #ifdef _LGPL_SOURCE
@@ -62,21 +67,28 @@ struct cds_lfq_queue_rcu {
 #else /* !_LGPL_SOURCE */
 
 extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node);
-extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q);
-extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node);
+extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+                            void (*release)(struct urcu_ref *ref));
+
+/*
+ * Should be called under rcu read lock critical section.
+ */
+extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
+                               struct cds_lfq_node_rcu *node);
 
 /*
- * The entry returned by dequeue must be taken care of by doing a urcu_ref_put,
- * which calls the release primitive when the reference count drops to zero. A
- * grace period must be waited after execution of the release callback before
- * performing the actual memory reclamation or modifying the cds_lfq_node_rcu
- * structure.
+ * Should be called under rcu read lock critical section.
+ *
+ * The entry returned by dequeue must be taken care of by doing a
+ * urcu_delayed_ref_put, which calls the release primitive after the
+ * reference count drops to zero _and_ a following grace period passes.
+ *
  * In other words, the entry lfq node returned by dequeue must not be
  * modified/re-used/freed until the reference count reaches zero and a grace
  * period has elapsed (after the refcount reached 0).
  */
 extern struct cds_lfq_node_rcu *
-cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q, void (*release)(struct urcu_ref *));
+cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q);
 
 #endif /* !_LGPL_SOURCE */
 
index f541d40..7caf3c8 100644 (file)
@@ -61,6 +61,8 @@ void _cds_lfs_push_rcu(struct cds_lfs_stack_rcu *s, struct cds_lfs_node_rcu *nod
 }
 
 /*
+ * Should be called under rcu read-side lock.
+ *
  * The caller must wait for a grace period to pass before freeing the returned
  * node or modifying the cds_lfs_node_rcu structure.
  * Returns NULL if stack is empty.
@@ -71,22 +73,18 @@ _cds_lfs_pop_rcu(struct cds_lfs_stack_rcu *s)
        for (;;) {
                struct cds_lfs_node_rcu *head;
 
-               rcu_read_lock();
                head = rcu_dereference(s->head);
                if (head) {
                        struct cds_lfs_node_rcu *next = rcu_dereference(head->next);
 
                        if (uatomic_cmpxchg(&s->head, head, next) == head) {
-                               rcu_read_unlock();
                                return head;
                        } else {
                                /* Concurrent modification. Retry. */
-                               rcu_read_unlock();
                                continue;
                        }
                } else {
                        /* Empty stack */
-                       rcu_read_unlock();
                        return NULL;
                }
        }
index 20ac26c..d759854 100644 (file)
@@ -51,6 +51,8 @@ extern void cds_lfs_init_rcu(struct cds_lfs_stack_rcu *s);
 extern void cds_lfs_push_rcu(struct cds_lfs_stack_rcu *s, struct cds_lfs_node_rcu *node);
 
 /*
+ * Should be called under rcu read lock critical section.
+ *
  * The caller must wait for a grace period to pass before freeing the returned
  * node or modifying the cds_lfs_node_rcu structure.
  * Returns NULL if stack is empty.
This page took 0.041065 seconds and 4 git commands to generate.