From: Mathieu Desnoyers Date: Fri, 1 Jul 2011 20:34:38 +0000 (-0400) Subject: RCU lfqueue: Now works without reference counting (API change) X-Git-Tag: v0.6.5~68 X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=commitdiff_plain;h=e17d9985aeb7eab28f3ab1ba5a3076b49d504958 RCU lfqueue: Now works without reference counting (API change) Signed-off-by: Mathieu Desnoyers --- diff --git a/rculfqueue.c b/rculfqueue.c index 0daee5d..09ba9cf 100644 --- a/rculfqueue.c +++ b/rculfqueue.c @@ -39,9 +39,15 @@ void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) } void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void (*release)(struct urcu_ref *ref)) + void queue_call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head))) { - _cds_lfq_init_rcu(q, release); + _cds_lfq_init_rcu(q, queue_call_rcu); +} + +int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) +{ + return _cds_lfq_destroy_rcu(q); } void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node) diff --git a/tests/test_urcu_lfq.c b/tests/test_urcu_lfq.c index cb50586..b61a7d4 100644 --- a/tests/test_urcu_lfq.c +++ b/tests/test_urcu_lfq.c @@ -66,6 +66,7 @@ static inline pid_t gettid(void) #endif #include #include +#include static volatile int test_go, test_stop; @@ -204,20 +205,6 @@ fail: } -static void rcu_free_node(struct rcu_head *head) -{ - struct cds_lfq_node_rcu *node = - caa_container_of(head, struct cds_lfq_node_rcu, rcu_head); - free(node); -} - -static void ref_release_node(struct urcu_ref *ref) -{ - struct cds_lfq_node_rcu *node = - caa_container_of(ref, struct cds_lfq_node_rcu, ref); - call_rcu(&node->rcu_head, rcu_free_node); -} - void *thr_dequeuer(void *_count) { unsigned long long *count = _count; @@ -228,6 +215,11 @@ void *thr_dequeuer(void *_count) set_affinity(); + ret = rcu_defer_register_thread(); + if (ret) { + printf("Error in rcu_defer_register_thread\n"); + exit(-1); + } rcu_register_thread(); while (!test_go) @@ -243,7 +235,7 @@ void *thr_dequeuer(void *_count) rcu_read_unlock(); if (node) { - urcu_ref_put(&node->ref, ref_release_node); + defer_rcu(free, node); nr_successful_dequeues++; } @@ -255,6 +247,7 @@ void *thr_dequeuer(void *_count) } rcu_unregister_thread(); + rcu_defer_unregister_thread(); printf_verbose("dequeuer thread_end, thread id : %lx, tid %lu, " "dequeues %llu, successful_dequeues %llu\n", pthread_self(), (unsigned long)gettid(), nr_dequeues, @@ -264,12 +257,6 @@ void *thr_dequeuer(void *_count) return ((void*)2); } -static void release_node(struct urcu_ref *ref) -{ - struct cds_lfq_node_rcu *node = caa_container_of(ref, struct cds_lfq_node_rcu, ref); - free(node); -} - void test_end(struct cds_lfq_queue_rcu *q, unsigned long long *nr_dequeues) { struct cds_lfq_node_rcu *node; @@ -279,7 +266,7 @@ void test_end(struct cds_lfq_queue_rcu *q, unsigned long long *nr_dequeues) node = cds_lfq_dequeue_rcu(q); rcu_read_unlock(); if (node) { - urcu_ref_put(&node->ref, release_node); + free(node); /* no more concurrent access */ (*nr_dequeues)++; } } while (node); @@ -376,7 +363,7 @@ int main(int argc, char **argv) tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers); count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers); count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers); - cds_lfq_init_rcu(&q, ref_release_node); + cds_lfq_init_rcu(&q, call_rcu); next_aff = 0; @@ -421,6 +408,8 @@ int main(int argc, char **argv) } test_end(&q, &end_dequeues); + err = cds_lfq_destroy_rcu(&q); + assert(!err); printf_verbose("total number of enqueues : %llu, dequeues %llu\n", tot_enqueues, tot_dequeues); diff --git a/urcu/rculfqueue.h b/urcu/rculfqueue.h index fbef6f9..757d133 100644 --- a/urcu/rculfqueue.h +++ b/urcu/rculfqueue.h @@ -23,36 +23,24 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ -#include #include +#include #ifdef __cplusplus extern "C" { #endif -/* - * Lock-free RCU queue using reference counting. Enqueue and dequeue operations - * hold a RCU read lock to deal with cmpxchg ABA problem. This implementation - * keeps a dummy head node to ensure we can always update the queue locklessly. - * Given that this is a queue, the dummy head node must always advance as we - * dequeue entries. Therefore, we keep a reference count on each entry we are - * dequeueing, so they can be kept as dummy head node until the next dequeue, at - * which point their reference count will be decremented. - */ - struct cds_lfq_queue_rcu; struct cds_lfq_node_rcu { struct cds_lfq_node_rcu *next; - struct urcu_ref ref; - struct cds_lfq_queue_rcu *queue; - struct rcu_head rcu_head; }; struct cds_lfq_queue_rcu { - struct cds_lfq_node_rcu *head, *tail; - struct cds_lfq_node_rcu init; /* Dummy initialization node */ - void (*release)(struct urcu_ref *ref); + struct cds_lfq_node_rcu *head; + struct cds_lfq_node_rcu *tail; + void (*queue_call_rcu)(struct rcu_head *head, + void (*func)(struct rcu_head *head)); }; #ifdef _LGPL_SOURCE @@ -61,6 +49,7 @@ struct cds_lfq_queue_rcu { #define cds_lfq_node_init_rcu _cds_lfq_node_init_rcu #define cds_lfq_init_rcu _cds_lfq_init_rcu +#define cds_lfq_destroy_rcu _cds_lfq_destroy_rcu #define cds_lfq_enqueue_rcu _cds_lfq_enqueue_rcu #define cds_lfq_dequeue_rcu _cds_lfq_dequeue_rcu @@ -68,7 +57,14 @@ struct cds_lfq_queue_rcu { extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node); extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void (*release)(struct urcu_ref *ref)); + void queue_call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head))); +/* + * The queue should be emptied before calling destroy. + * + * Return 0 on success, -EPERM if queue is not empty. + */ +extern int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q); /* * Should be called under rcu read lock critical section. @@ -79,12 +75,9 @@ extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, /* * Should be called under rcu read lock critical section. * - * The entry returned by dequeue must be taken care of by doing a - * sequence of urcu_ref_put which release handler should do a call_rcu. - * - * In other words, the entry lfq node returned by dequeue must not be - * modified/re-used/freed until the reference count reaches zero and a grace - * period has elapsed (after the refcount reached 0). + * The caller must wait for a grace period to pass before freeing the returned + * node or modifying the cds_lfq_node_rcu structure. + * Returns NULL if queue is empty. */ extern struct cds_lfq_node_rcu *cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q); diff --git a/urcu/static/rculfqueue.h b/urcu/static/rculfqueue.h index b627e45..b83862f 100644 --- a/urcu/static/rculfqueue.h +++ b/urcu/static/rculfqueue.h @@ -26,52 +26,130 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ -#include +#include #include #include +#include /* A urcu implementation header should be already included. */ #ifdef __cplusplus extern "C" { #endif +struct cds_lfq_node_rcu_dummy { + struct cds_lfq_node_rcu parent; + struct rcu_head head; + struct cds_lfq_queue_rcu *q; +}; + /* - * Lock-free RCU queue using reference counting. Enqueue and dequeue operations - * hold a RCU read lock to deal with cmpxchg ABA problem. This implementation - * keeps a dummy head node to ensure we can always update the queue locklessly. - * Given that this is a queue, the dummy head node must always advance as we - * dequeue entries. Therefore, we keep a reference count on each entry we are - * dequeueing, so they can be kept as dummy head node until the next dequeue, at - * which point their reference count will be decremented. + * Lock-free RCU queue. Enqueue and dequeue operations hold a RCU read + * lock to deal with cmpxchg ABA problem. This queue is *not* circular: + * head points to the oldest node, tail points to the newest node. + * Dummy nodes are kept to ensure enqueue and dequeue can always proceed + * concurrently. Keeping a separate head and tail helps with large + * queues: enqueue and dequeue can proceed concurrently without + * wrestling for exclusive access to the same variables. + * + * We keep two dummy nodes in the queue to distinguish between empty queue + * state and intermediate state while a dummy node dequeue/requeue is + * being performed. Dequeue retry if it detects that it would be + * dequeueing the last node (it means a dummy node dequeue-requeue is in + * progress). This ensures that there is always at least one node in + * the queue. In a situation where the two dummy nodes are being + * requeued (they therefore don't appear in the queue at a given + * moment), we are certain that there is at least one non-dummy node in + * the queue (ensured by the test for NULL next node upon dequeue). + * + * In the dequeue operation, we internally reallocate the dummy nodes + * upon dequeue/requeue and use call_rcu to free them after a grace + * period. */ -#define URCU_LFQ_PERMANENT_REF 128 +static inline +int is_dummy(struct cds_lfq_node_rcu *node) +{ + return ((unsigned long) node) & 0x1UL; +} + +static inline +struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q, + struct cds_lfq_node_rcu *next) +{ + struct cds_lfq_node_rcu_dummy *dummy; + + dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy)); + dummy->parent.next = next; + dummy->q = q; + return (struct cds_lfq_node_rcu *) (((unsigned long) &dummy->parent) | 0x1UL); +} + +static inline +struct cds_lfq_node_rcu *get_node(struct cds_lfq_node_rcu *node) +{ + return (struct cds_lfq_node_rcu *) (((unsigned long )node) & ~0x1UL); +} + +static inline +void free_dummy(struct rcu_head *head) +{ + struct cds_lfq_node_rcu_dummy *dummy = + caa_container_of(head, struct cds_lfq_node_rcu_dummy, head); + free(dummy); +} + +static inline +void rcu_free_dummy(struct cds_lfq_node_rcu *node) +{ + struct cds_lfq_node_rcu_dummy *dummy; + dummy = caa_container_of(get_node(node), struct cds_lfq_node_rcu_dummy, + parent); + dummy->q->queue_call_rcu(&dummy->head, free_dummy); +} + +static inline void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) { node->next = NULL; - urcu_ref_init(&node->ref); } +static inline void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void (*release)(struct urcu_ref *ref)) + void queue_call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head))) +{ + q->tail = make_dummy(q, NULL); + q->head = make_dummy(q, q->tail); + q->queue_call_rcu = queue_call_rcu; +} + +/* + * The queue should be emptied before calling destroy. + * + * Return 0 on success, -EPERM if queue is not empty. + */ +static inline +int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) { - _cds_lfq_node_init_rcu(&q->init); - /* Make sure the initial node is never freed. */ - urcu_ref_set(&q->init.ref, URCU_LFQ_PERMANENT_REF); - q->head = q->tail = &q->init; - q->release = release; + struct cds_lfq_node_rcu *head, *next; + + head = rcu_dereference(q->head); + next = rcu_dereference(get_node(head)->next); + if (!(is_dummy(head) && is_dummy(next) && get_node(next)->next == NULL)) + return -EPERM; /* not empty */ + rcu_free_dummy(head); + rcu_free_dummy(next); + return 0; } /* * Should be called under rcu read lock critical section. */ +static inline void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node) { - urcu_ref_get(&node->ref); - node->queue = q; - /* * uatomic_cmpxchg() implicit memory barrier orders earlier stores to * node before publication. @@ -81,23 +159,19 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *tail, *next; tail = rcu_dereference(q->tail); - /* - * Typically expect tail->next to be NULL. - */ - next = uatomic_cmpxchg(&tail->next, NULL, node); + next = uatomic_cmpxchg(&get_node(tail)->next, NULL, node); if (next == NULL) { /* * Tail was at the end of queue, we successfully - * appended to it. - * Now move tail (another enqueue might beat - * us to it, that's fine). + * appended to it. Now move tail (another + * enqueue might beat us to it, that's fine). */ (void) uatomic_cmpxchg(&q->tail, tail, node); return; } else { /* - * Failure to append to current tail. Help moving tail - * further and retry. + * Failure to append to current tail. + * Help moving tail further and retry. */ (void) uatomic_cmpxchg(&q->tail, tail, next); continue; @@ -108,31 +182,49 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, /* * Should be called under rcu read lock critical section. * - * The entry returned by dequeue must be taken care of by doing a - * sequence of urcu_ref_put which release handler should do a call_rcu. - * - * In other words, the entry lfq node returned by dequeue must not be - * modified/re-used/freed until the reference count reaches zero and a grace - * period has elapsed. + * The caller must wait for a grace period to pass before freeing the returned + * node or modifying the cds_lfq_node_rcu structure. + * Returns NULL if queue is empty. */ +static inline struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) { for (;;) { struct cds_lfq_node_rcu *head, *next; head = rcu_dereference(q->head); - next = rcu_dereference(head->next); + next = rcu_dereference(get_node(head)->next); + if (is_dummy(head) && is_dummy(next) && get_node(next)->next == NULL) + return NULL; /* empty */ + /* + * We never, ever allow dequeue to get to a state where + * the queue is empty (we need at least one node in the + * queue). This is ensured by checking if the head next + * is NULL and retry in that case (this means a + * concurrent dummy node re-enqueue is in progress). + */ if (next) { if (uatomic_cmpxchg(&q->head, head, next) == head) { - urcu_ref_put(&head->ref, q->release); - return next; + if (is_dummy(head)) { + struct cds_lfq_node_rcu *node; + /* + * Requeue dummy. We need to + * reallocate to protect from + * ABA. + */ + rcu_free_dummy(head); + node = make_dummy(q, NULL); + _cds_lfq_enqueue_rcu(q, node); + continue; /* try again */ + } + return head; } else { /* Concurrently pushed, retry */ continue; } } else { - /* Empty */ - return NULL; + /* Dummy node re-enqueue is in progress, retry. */ + continue; } } }