From f671981162a0a84135797ba01661c5c832c5e430 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Fri, 1 Jul 2011 18:50:15 -0400 Subject: [PATCH] rcu lfqueue: make dequeue lockless by helping out other dequeuers Signed-off-by: Mathieu Desnoyers --- urcu/static/rculfqueue.h | 97 +++++++++++++++++++++------------------- 1 file changed, 51 insertions(+), 46 deletions(-) diff --git a/urcu/static/rculfqueue.h b/urcu/static/rculfqueue.h index 6045c18..f3df7c0 100644 --- a/urcu/static/rculfqueue.h +++ b/urcu/static/rculfqueue.h @@ -59,12 +59,6 @@ struct cds_lfq_node_rcu_dummy { * grace period. */ -static inline -int is_dummy(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node) -{ - return node == CMM_LOAD_SHARED(q->dummy); -} - static inline struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *next) @@ -79,7 +73,7 @@ struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q, } static inline -void free_dummy(struct rcu_head *head) +void free_dummy_cb(struct rcu_head *head) { struct cds_lfq_node_rcu_dummy *dummy = caa_container_of(head, struct cds_lfq_node_rcu_dummy, head); @@ -92,7 +86,16 @@ void rcu_free_dummy(struct cds_lfq_node_rcu *node) struct cds_lfq_node_rcu_dummy *dummy; dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); - dummy->q->queue_call_rcu(&dummy->head, free_dummy); + dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb); +} + +static inline +void free_dummy(struct cds_lfq_node_rcu *node) +{ + struct cds_lfq_node_rcu_dummy *dummy; + + dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); + free(dummy); } static inline @@ -123,9 +126,9 @@ int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) struct cds_lfq_node_rcu *head; head = rcu_dereference(q->head); - if (!(is_dummy(q, head) && head->next == NULL)) + if (!(head == q->dummy && head->next == NULL)) return -EPERM; /* not empty */ - rcu_free_dummy(head); + free_dummy(head); return 0; } @@ -165,6 +168,21 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, } } +static inline +void reenqueue_dummy(struct cds_lfq_queue_rcu *q) +{ + struct cds_lfq_node_rcu *node; + + /* We need to reallocate to protect from ABA. */ + node = make_dummy(q, NULL); + if (uatomic_cmpxchg(&q->dummy, NULL, node) != NULL) { + /* other dequeue populated its new dummy */ + free_dummy(node); + return; + } + _cds_lfq_enqueue_rcu(q, node); +} + /* * Should be called under rcu read lock critical section. * @@ -176,52 +194,39 @@ static inline struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) { for (;;) { - struct cds_lfq_node_rcu *head, *next; + struct cds_lfq_node_rcu *head, *next, *dummy; head = rcu_dereference(q->head); next = rcu_dereference(head->next); - if (is_dummy(q, head) && next == NULL) + dummy = rcu_dereference(q->dummy); + if (head == dummy && next == NULL) return NULL; /* empty */ /* * We never, ever allow dequeue to get to a state where * the queue is empty (we need at least one node in the * queue). This is ensured by checking if the head next - * is NULL and retry in that case (this means a - * concurrent dummy node re-enqueue is in progress). + * is NULL. This means a concurrent dummy node + * re-enqueue is in progress. We help it reach + * completion, and retry. */ - if (next) { - if (uatomic_cmpxchg(&q->head, head, next) == head) { - if (is_dummy(q, head)) { - struct cds_lfq_node_rcu *node; - /* - * Requeue dummy. We need to - * reallocate to protect from - * ABA. - */ - rcu_free_dummy(head); - node = make_dummy(q, NULL); - /* - * We are the only thread - * allowed to update dummy (we - * own the old dummy). Other - * dequeue threads read it - * concurrently with RCU - * read-lock held, which - * protects from ABA. - */ - CMM_STORE_SHARED(q->dummy, node); - _cds_lfq_enqueue_rcu(q, node); - continue; /* try again */ - } - return head; - } else { - /* Concurrently pushed, retry */ - continue; - } - } else { - /* Dummy node re-enqueue is in progress, retry. */ - continue; + if (!next) { + /* + * Dummy node re-enqueue is in progress. Try to + * help. + */ + reenqueue_dummy(q); + continue; /* try again */ + } + if (uatomic_cmpxchg(&q->head, head, next) != head) + continue; /* Concurrently pushed. */ + if (head == dummy) { + /* Free old and requeue new dummy. */ + rcu_set_pointer(&q->dummy, NULL); + rcu_free_dummy(dummy); + reenqueue_dummy(q); + continue; /* try again */ } + return head; } } -- 2.34.1