From: Mathieu Desnoyers Date: Mon, 25 Jul 2011 18:34:05 +0000 (-0400) Subject: Implement Lai Jiangshan's algorithm in RCU lock-free queue X-Git-Url: http://git.liburcu.org/?a=commitdiff_plain;h=2d37e098840ee195a05fee99ad4bb6cde08e34d6;p=userspace-rcu.git Implement Lai Jiangshan's algorithm in RCU lock-free queue Signed-off-by: Mathieu Desnoyers --- diff --git a/rculfqueue.c b/rculfqueue.c index 09ba9cf..eac5e8b 100644 --- a/rculfqueue.c +++ b/rculfqueue.c @@ -38,11 +38,14 @@ void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) _cds_lfq_node_init_rcu(node); } -void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void queue_call_rcu(struct rcu_head *head, - void (*func)(struct rcu_head *head))) +void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q) { - _cds_lfq_init_rcu(q, queue_call_rcu); + _cds_lfq_init_rcu(q); +} + +int cds_lfq_is_empty(struct cds_lfq_queue_rcu *q) +{ + return _cds_lfq_is_empty(q); } int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) diff --git a/tests/test_urcu_lfq.c b/tests/test_urcu_lfq.c index b61a7d4..0132d02 100644 --- a/tests/test_urcu_lfq.c +++ b/tests/test_urcu_lfq.c @@ -363,7 +363,7 @@ int main(int argc, char **argv) tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers); count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers); count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers); - cds_lfq_init_rcu(&q, call_rcu); + cds_lfq_init_rcu(&q); next_aff = 0; diff --git a/urcu/rculfqueue.h b/urcu/rculfqueue.h index 598fa50..c2014aa 100644 --- a/urcu/rculfqueue.h +++ b/urcu/rculfqueue.h @@ -6,7 +6,8 @@ * * Userspace RCU library - Lock-Free RCU Queue * - * Copyright 2010 - Mathieu Desnoyers + * Copyright 2010-2011 - Mathieu Desnoyers + * Copyright 2011 - Lai Jiangshan * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -33,14 +34,12 @@ extern "C" { struct cds_lfq_queue_rcu; struct cds_lfq_node_rcu { - struct cds_lfq_node_rcu *next; - int dummy; + unsigned long next; }; struct cds_lfq_queue_rcu { - struct cds_lfq_node_rcu *head, *tail; - void (*queue_call_rcu)(struct rcu_head *head, - void (*func)(struct rcu_head *head)); + unsigned long tail; + struct cds_lfq_node_rcu head; }; #ifdef _LGPL_SOURCE @@ -49,6 +48,7 @@ struct cds_lfq_queue_rcu { #define cds_lfq_node_init_rcu _cds_lfq_node_init_rcu #define cds_lfq_init_rcu _cds_lfq_init_rcu +#define cds_lfq_is_empty _cds_lfq_is_empty #define cds_lfq_destroy_rcu _cds_lfq_destroy_rcu #define cds_lfq_enqueue_rcu _cds_lfq_enqueue_rcu #define cds_lfq_dequeue_rcu _cds_lfq_dequeue_rcu @@ -56,9 +56,10 @@ struct cds_lfq_queue_rcu { #else /* !_LGPL_SOURCE */ extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node); -extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void queue_call_rcu(struct rcu_head *head, - void (*func)(struct rcu_head *head))); +extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q); + +extern int cds_lfq_is_empty(struct cds_lfq_queue_rcu *q); + /* * The queue should be emptied before calling destroy. * diff --git a/urcu/static/rculfqueue.h b/urcu/static/rculfqueue.h index fea6110..988f982 100644 --- a/urcu/static/rculfqueue.h +++ b/urcu/static/rculfqueue.h @@ -6,7 +6,8 @@ * * Userspace RCU library - Lock-Free RCU Queue * - * Copyright 2010 - Mathieu Desnoyers + * Copyright 2010-2011 - Mathieu Desnoyers + * Copyright 2011 - Lai Jiangshan * * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See rculfqueue.h for linking * dynamically with the userspace rcu library. @@ -35,87 +36,117 @@ extern "C" { #endif -struct cds_lfq_node_rcu_dummy { - struct cds_lfq_node_rcu parent; - struct rcu_head head; - struct cds_lfq_queue_rcu *q; +enum node_type { + NODE_NODE = 0, + NODE_HEAD = 1, + NODE_NULL = 2, /* transitional */ }; +#define NODE_TYPE_BITS 2 +#define NODE_TYPE_MASK ((1UL << NODE_TYPE_BITS) - 1) + /* - * Lock-free RCU queue. Enqueue and dequeue operations hold a RCU read - * lock to deal with cmpxchg ABA problem. This queue is *not* circular: - * head points to the oldest node, tail points to the newest node. - * A dummy node is kept to ensure enqueue and dequeue can always proceed - * concurrently. Keeping a separate head and tail helps with large - * queues: enqueue and dequeue can proceed concurrently without - * wrestling for exclusive access to the same variables. + * Lock-free RCU queue. + * + * Node addresses must be allocated on multiples of 4 bytes, because the + * two bottom bits are used internally. "Special" HEAD and NULL node + * references use a sequence counter (rather than an address). The + * sequence count is incremented as elements are enqueued. Enqueue and + * dequeue operations hold a RCU read lock to deal with uatomic_cmpxchg + * ABA problem on standard node addresses. The sequence count of HEAD + * and NULL nodes deals with ABA problem with these nodes. + * + * Keeping a sequence count throughout the list allows dealing with + * dequeue-the-last/enqueue-the-first operations without need for adding + * any dummy node in the queue. * - * Dequeue retry if it detects that it would be dequeueing the last node - * (it means a dummy node dequeue-requeue is in progress). This ensures - * that there is always at least one node in the queue. + * This queue is not circular. The head node is located prior to the + * oldest node, tail points to the newest node. * - * In the dequeue operation, we internally reallocate the dummy node - * upon dequeue/requeue and use call_rcu to free the old one after a - * grace period. + * Keeping a separate head and tail helps with large queues: enqueue and + * dequeue can proceed concurrently without wrestling for exclusive + * access to the same variables. */ static inline -struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q, - struct cds_lfq_node_rcu *next) +enum node_type queue_node_type(unsigned long node) { - struct cds_lfq_node_rcu_dummy *dummy; - - dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy)); - assert(dummy); - dummy->parent.next = next; - dummy->parent.dummy = 1; - dummy->q = q; - return &dummy->parent; + return node & NODE_TYPE_MASK; } static inline -void free_dummy_cb(struct rcu_head *head) +unsigned long queue_node_seq(unsigned long node) { - struct cds_lfq_node_rcu_dummy *dummy = - caa_container_of(head, struct cds_lfq_node_rcu_dummy, head); - free(dummy); + assert(queue_node_type(node) == NODE_HEAD + || queue_node_type(node) == NODE_NULL); + return node >> NODE_TYPE_BITS; } static inline -void rcu_free_dummy(struct cds_lfq_node_rcu *node) +struct cds_lfq_node_rcu *queue_node_node(unsigned long node) { - struct cds_lfq_node_rcu_dummy *dummy; + assert(queue_node_type(node) == NODE_NODE); + return (void *) (node & ~NODE_TYPE_MASK); +} - assert(node->dummy); - dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); - dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb); +static inline +unsigned long queue_make_node(struct cds_lfq_node_rcu *node) +{ + return ((unsigned long) node) | NODE_NODE; } static inline -void free_dummy(struct cds_lfq_node_rcu *node) +unsigned long queue_make_head(unsigned long seq) { - struct cds_lfq_node_rcu_dummy *dummy; + return (seq << NODE_TYPE_BITS) | NODE_HEAD; +} - assert(node->dummy); - dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); - free(dummy); +static inline +unsigned long queue_make_null(unsigned long seq) +{ + return (seq << NODE_TYPE_BITS) | NODE_NULL; } + static inline void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) { - node->next = NULL; - node->dummy = 0; + /* Kept here for object debugging. */ +} + +static inline +void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q) +{ + q->head.next = queue_make_head(0); + q->tail = queue_make_head(0); } static inline -void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void queue_call_rcu(struct rcu_head *head, - void (*func)(struct rcu_head *head))) +int _cds_lfq_is_empty(struct cds_lfq_queue_rcu *q) { - q->tail = make_dummy(q, NULL); - q->head = q->tail; - q->queue_call_rcu = queue_call_rcu; + unsigned long head, next; + struct cds_lfq_node_rcu *phead; + + head = rcu_dereference(q->head.next); + if (queue_node_type(head) == NODE_HEAD) { + /* F0 or T0b */ + return 1; + } + + phead = queue_node_node(head); + next = rcu_dereference(phead->next); + + if (queue_node_type(next) == NODE_HEAD) { /* T1, F1 */ + /* only one node */ + return 0; + } else if (queue_node_type(next) == NODE_NODE) { + /* have >=2 nodes, Tn{n>=2} Fn{n>=2} */ + return 0; + } else { + /* T0a */ + assert(queue_node_type(next) == NODE_NULL); + return 1; + } } /* @@ -126,98 +157,223 @@ void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, static inline int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) { - struct cds_lfq_node_rcu *head; - - head = rcu_dereference(q->head); - if (!(head->dummy && head->next == NULL)) - return -EPERM; /* not empty */ - free_dummy(head); + if (!_cds_lfq_is_empty(q)) + return -EPERM; + /* Kept here for object debugging. */ return 0; } +static void __queue_post_dequeue_the_last(struct cds_lfq_queue_rcu *q, + unsigned long old_head, unsigned long new_head); + +/* + * Lock-free queue uses rcu_read_lock() to proctect the life-time + * of the nodes and to prevent ABA-problem. + * + * cds_lfq_enqueue_rcu() and cds_lfq_dequeue_rcu() need to be called + * under rcu read lock critical section. The node returned by + * queue_dequeue() must not be modified/re-used/freed until a + * grace-period passed. + */ + +static inline +unsigned long __queue_enqueue(struct cds_lfq_queue_rcu *q, + unsigned long tail, unsigned long next, + struct cds_lfq_node_rcu *ptail, struct cds_lfq_node_rcu *pnode) +{ + unsigned long newnext; + + /* increase the seq for every enqueued node */ + pnode->next = queue_make_head(queue_node_seq(next) + 1); + + /* Fn(seq) -> T(n+1)(seq+1) */ + newnext = uatomic_cmpxchg(&ptail->next, next, queue_make_node(pnode)); + + if (newnext != next) + return newnext; + + /* success, move tail(or done by other), T(n+1) -> F(n+1) */ + uatomic_cmpxchg(&q->tail, tail, queue_make_node(pnode)); + return next; +} + /* - * Should be called under rcu read lock critical section. + * Needs to be called with RCU read-side lock held. */ static inline void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, - struct cds_lfq_node_rcu *node) + struct cds_lfq_node_rcu *pnode) { - /* - * uatomic_cmpxchg() implicit memory barrier orders earlier stores to - * node before publication. - */ + unsigned long tail, next; + struct cds_lfq_node_rcu *ptail; for (;;) { - struct cds_lfq_node_rcu *tail, *next; - tail = rcu_dereference(q->tail); - next = uatomic_cmpxchg(&tail->next, NULL, node); - if (next == NULL) { - /* - * Tail was at the end of queue, we successfully - * appended to it. Now move tail (another - * enqueue might beat us to it, that's fine). - */ - (void) uatomic_cmpxchg(&q->tail, tail, node); - return; - } else { + if (queue_node_type(tail) == NODE_HEAD) { /* F0 */ + ptail = &q->head; + next = tail; /* - * Failure to append to current tail. - * Help moving tail further and retry. + * We cannot use "next = rcu_dereference(ptail->next);" + * here, because it is control dependency, not data + * dependency. But since F0 is the most likely state + * when 0 node, so we use 'next = tail'. */ - (void) uatomic_cmpxchg(&q->tail, tail, next); - continue; + } else { /* Fn, Tn */ + ptail = queue_node_node(tail); + next = rcu_dereference(ptail->next); + } + + if (queue_node_type(next) == NODE_HEAD) { /* Fn */ + unsigned long newnext; + + /* Fn{n>=0} -> F(n+1) */ + newnext = __queue_enqueue(q, tail, next, ptail, pnode); + if (newnext == next) { + return; + } + next = newnext; + } + + if (queue_node_type(next) == NODE_NODE) { /* Tn */ + /* help moving tail, Tn{n>=1} -> Fn */ + uatomic_cmpxchg(&q->tail, tail, next); + } else if (queue_node_type(next) == NODE_NULL) { + /* help finishing dequeuing the last, T0a or T0b -> F0 */ + __queue_post_dequeue_the_last(q, tail, + queue_make_head(queue_node_seq(next))); } } } static inline -void enqueue_dummy(struct cds_lfq_queue_rcu *q) +void __queue_post_dequeue_the_last(struct cds_lfq_queue_rcu *q, + unsigned long old_head, unsigned long new_head) +{ + /* step2: T0a -> T0b */ + uatomic_cmpxchg(&q->head.next, old_head, new_head); + + /* step3: T0b -> F0 */ + uatomic_cmpxchg(&q->tail, old_head, new_head); +} + +static inline +int __queue_dequeue_the_last(struct cds_lfq_queue_rcu *q, + unsigned long head, unsigned long next, + struct cds_lfq_node_rcu *plast) { - struct cds_lfq_node_rcu *node; + unsigned long origin_tail = rcu_dereference(q->tail); - /* We need to reallocate to protect from ABA. */ - node = make_dummy(q, NULL); - _cds_lfq_enqueue_rcu(q, node); + /* + * T1 -> F1 if T1, we cannot dequeue the last node when T1. + * + * pseudocode is: + * tail = rcu_dereference(q->tail); (*) + * if (tail == queue_make_head(seq - 1)) + * uatomic_cmpxchg(&q->tail, tail, head); + * But we only expect (*) gets tail's value is: + * head (F1)(likely got) + * queue_make_head(seq - 1) (T1) + * not newer nor older value, so the pseudocode is not acceptable. + */ + if (origin_tail != head) { + unsigned long tail; + + /* Don't believe the orderless-read tail! */ + origin_tail = queue_make_head(queue_node_seq(next) - 1); + + /* help moving tail, T1 -> F1 */ + tail = uatomic_cmpxchg(&q->tail, origin_tail, head); + + if (tail != origin_tail && tail != head) + return 0; + } + + /* step1: F1 -> T0a */ + if (uatomic_cmpxchg(&plast->next, next, queue_make_null(queue_node_seq(next))) != next) + return 0; + + __queue_post_dequeue_the_last(q, head, next); + return 1; +} + +static inline +int __queue_dequeue(struct cds_lfq_queue_rcu *q, + unsigned long head, unsigned long next) +{ + struct cds_lfq_node_rcu *pnext = queue_node_node(next); + unsigned long nextnext = rcu_dereference(pnext->next); + + /* + * T2 -> F2 if T2, we cannot dequeue the first node when T2. + * + * pseudocode is: + * tail = rcu_dereference(q->tail); (*) + * if (tail == head) + * uatomic_cmpxchg(&q->tail, head, next); + * But we only expect (*) gets tail's value is: + * node in the queue + * not older value, the older value cause us save a uatomic_cmpxchg() wrongly, + * so the pseudocode is not acceptable. + * + * using uatomic_cmpxchg always is OK, but it adds a uatomic_cmpxchg overhead always: + * uatomic_cmpxchg(&q->tail, head, next); + */ + if (queue_node_type(nextnext) == NODE_HEAD) { /* 2 nodes */ + unsigned long tail = rcu_dereference(q->tail); + + /* + * tail == next: now is F2, don't need help moving tail + * tail != next: it is unlikely when 2 nodes. + * Don't believe the orderless-read tail! + */ + if (tail != next) + uatomic_cmpxchg(&q->tail, head, next); /* help for T2 -> F2 */ + } + + /* Fn{n>=2} -> F(n-1), Tn{n>=3} -> T(n-1) */ + if (uatomic_cmpxchg(&q->head.next, head, next) != head) + return 0; + + return 1; } /* - * Should be called under rcu read lock critical section. - * - * The caller must wait for a grace period to pass before freeing the returned - * node or modifying the cds_lfq_node_rcu structure. - * Returns NULL if queue is empty. + * Needs to be called with rcu read-side lock held. + * Wait for a grace period before freeing/reusing the returned node. + * If NULL is returned, the queue is empty. */ static inline struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) { - for (;;) { - struct cds_lfq_node_rcu *head, *next; + unsigned long head, next; + struct cds_lfq_node_rcu *phead; - head = rcu_dereference(q->head); - next = rcu_dereference(head->next); - if (head->dummy && next == NULL) - return NULL; /* empty */ - /* - * We never, ever allow dequeue to get to a state where - * the queue is empty (we need at least one node in the - * queue). This is ensured by checking if the head next - * is NULL, which means we need to enqueue a dummy node - * before we can hope dequeuing anything. - */ - if (!next) { - enqueue_dummy(q); - next = rcu_dereference(head->next); + for (;;) { + head = rcu_dereference(q->head.next); + if (queue_node_type(head) == NODE_HEAD) { + /* F0 or T0b */ + return NULL; } - if (uatomic_cmpxchg(&q->head, head, next) != head) - continue; /* Concurrently pushed. */ - if (head->dummy) { - /* Free dummy after grace period. */ - rcu_free_dummy(head); - continue; /* try again */ + + phead = queue_node_node(head); + next = rcu_dereference(phead->next); + + if (queue_node_type(next) == NODE_HEAD) { /* T1, F1 */ + /* dequeue when only one node */ + if (__queue_dequeue_the_last(q, head, next, phead)) + goto done; + } else if (queue_node_type(next) == NODE_NODE) { + /* dequeue when have >=2 nodes, Tn{n>=2} Fn{n>=2} */ + if (__queue_dequeue(q, head, next)) + goto done; + } else { + /* T0a */ + assert(queue_node_type(next) == NODE_NULL); + return NULL; } - return head; } +done: + return phead; } #ifdef __cplusplus