From: Mathieu Desnoyers Date: Wed, 17 Aug 2011 10:07:35 +0000 (-0400) Subject: Merge branch 'master' into lfqueue-dev X-Git-Tag: v0.6.5~51 X-Git-Url: https://git.liburcu.org/?p=urcu.git;a=commitdiff_plain;h=e7385b904e9fb77e94a85a7c0d092cc6039dd948;hp=83a2c42177d2b28890bc3674d30f9ed5f8edb0a8 Merge branch 'master' into lfqueue-dev --- diff --git a/rculfqueue.c b/rculfqueue.c index 0daee5d..09ba9cf 100644 --- a/rculfqueue.c +++ b/rculfqueue.c @@ -39,9 +39,15 @@ void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) } void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void (*release)(struct urcu_ref *ref)) + void queue_call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head))) { - _cds_lfq_init_rcu(q, release); + _cds_lfq_init_rcu(q, queue_call_rcu); +} + +int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) +{ + return _cds_lfq_destroy_rcu(q); } void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node) diff --git a/tests/test_urcu_lfq.c b/tests/test_urcu_lfq.c index cb50586..b61a7d4 100644 --- a/tests/test_urcu_lfq.c +++ b/tests/test_urcu_lfq.c @@ -66,6 +66,7 @@ static inline pid_t gettid(void) #endif #include #include +#include static volatile int test_go, test_stop; @@ -204,20 +205,6 @@ fail: } -static void rcu_free_node(struct rcu_head *head) -{ - struct cds_lfq_node_rcu *node = - caa_container_of(head, struct cds_lfq_node_rcu, rcu_head); - free(node); -} - -static void ref_release_node(struct urcu_ref *ref) -{ - struct cds_lfq_node_rcu *node = - caa_container_of(ref, struct cds_lfq_node_rcu, ref); - call_rcu(&node->rcu_head, rcu_free_node); -} - void *thr_dequeuer(void *_count) { unsigned long long *count = _count; @@ -228,6 +215,11 @@ void *thr_dequeuer(void *_count) set_affinity(); + ret = rcu_defer_register_thread(); + if (ret) { + printf("Error in rcu_defer_register_thread\n"); + exit(-1); + } rcu_register_thread(); while (!test_go) @@ -243,7 +235,7 @@ void *thr_dequeuer(void *_count) rcu_read_unlock(); if (node) { - urcu_ref_put(&node->ref, ref_release_node); + defer_rcu(free, node); nr_successful_dequeues++; } @@ -255,6 +247,7 @@ void *thr_dequeuer(void *_count) } rcu_unregister_thread(); + rcu_defer_unregister_thread(); printf_verbose("dequeuer thread_end, thread id : %lx, tid %lu, " "dequeues %llu, successful_dequeues %llu\n", pthread_self(), (unsigned long)gettid(), nr_dequeues, @@ -264,12 +257,6 @@ void *thr_dequeuer(void *_count) return ((void*)2); } -static void release_node(struct urcu_ref *ref) -{ - struct cds_lfq_node_rcu *node = caa_container_of(ref, struct cds_lfq_node_rcu, ref); - free(node); -} - void test_end(struct cds_lfq_queue_rcu *q, unsigned long long *nr_dequeues) { struct cds_lfq_node_rcu *node; @@ -279,7 +266,7 @@ void test_end(struct cds_lfq_queue_rcu *q, unsigned long long *nr_dequeues) node = cds_lfq_dequeue_rcu(q); rcu_read_unlock(); if (node) { - urcu_ref_put(&node->ref, release_node); + free(node); /* no more concurrent access */ (*nr_dequeues)++; } } while (node); @@ -376,7 +363,7 @@ int main(int argc, char **argv) tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers); count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers); count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers); - cds_lfq_init_rcu(&q, ref_release_node); + cds_lfq_init_rcu(&q, call_rcu); next_aff = 0; @@ -421,6 +408,8 @@ int main(int argc, char **argv) } test_end(&q, &end_dequeues); + err = cds_lfq_destroy_rcu(&q); + assert(!err); printf_verbose("total number of enqueues : %llu, dequeues %llu\n", tot_enqueues, tot_dequeues); diff --git a/urcu/rculfqueue.h b/urcu/rculfqueue.h index fbef6f9..598fa50 100644 --- a/urcu/rculfqueue.h +++ b/urcu/rculfqueue.h @@ -23,36 +23,24 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ -#include #include +#include #ifdef __cplusplus extern "C" { #endif -/* - * Lock-free RCU queue using reference counting. Enqueue and dequeue operations - * hold a RCU read lock to deal with cmpxchg ABA problem. This implementation - * keeps a dummy head node to ensure we can always update the queue locklessly. - * Given that this is a queue, the dummy head node must always advance as we - * dequeue entries. Therefore, we keep a reference count on each entry we are - * dequeueing, so they can be kept as dummy head node until the next dequeue, at - * which point their reference count will be decremented. - */ - struct cds_lfq_queue_rcu; struct cds_lfq_node_rcu { struct cds_lfq_node_rcu *next; - struct urcu_ref ref; - struct cds_lfq_queue_rcu *queue; - struct rcu_head rcu_head; + int dummy; }; struct cds_lfq_queue_rcu { struct cds_lfq_node_rcu *head, *tail; - struct cds_lfq_node_rcu init; /* Dummy initialization node */ - void (*release)(struct urcu_ref *ref); + void (*queue_call_rcu)(struct rcu_head *head, + void (*func)(struct rcu_head *head)); }; #ifdef _LGPL_SOURCE @@ -61,6 +49,7 @@ struct cds_lfq_queue_rcu { #define cds_lfq_node_init_rcu _cds_lfq_node_init_rcu #define cds_lfq_init_rcu _cds_lfq_init_rcu +#define cds_lfq_destroy_rcu _cds_lfq_destroy_rcu #define cds_lfq_enqueue_rcu _cds_lfq_enqueue_rcu #define cds_lfq_dequeue_rcu _cds_lfq_dequeue_rcu @@ -68,7 +57,14 @@ struct cds_lfq_queue_rcu { extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node); extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void (*release)(struct urcu_ref *ref)); + void queue_call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head))); +/* + * The queue should be emptied before calling destroy. + * + * Return 0 on success, -EPERM if queue is not empty. + */ +extern int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q); /* * Should be called under rcu read lock critical section. @@ -79,12 +75,9 @@ extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, /* * Should be called under rcu read lock critical section. * - * The entry returned by dequeue must be taken care of by doing a - * sequence of urcu_ref_put which release handler should do a call_rcu. - * - * In other words, the entry lfq node returned by dequeue must not be - * modified/re-used/freed until the reference count reaches zero and a grace - * period has elapsed (after the refcount reached 0). + * The caller must wait for a grace period to pass before freeing the returned + * node or modifying the cds_lfq_node_rcu structure. + * Returns NULL if queue is empty. */ extern struct cds_lfq_node_rcu *cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q); diff --git a/urcu/static/rculfqueue.h b/urcu/static/rculfqueue.h index b627e45..fea6110 100644 --- a/urcu/static/rculfqueue.h +++ b/urcu/static/rculfqueue.h @@ -26,52 +26,122 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ -#include +#include #include #include -/* A urcu implementation header should be already included. */ +#include #ifdef __cplusplus extern "C" { #endif +struct cds_lfq_node_rcu_dummy { + struct cds_lfq_node_rcu parent; + struct rcu_head head; + struct cds_lfq_queue_rcu *q; +}; + /* - * Lock-free RCU queue using reference counting. Enqueue and dequeue operations - * hold a RCU read lock to deal with cmpxchg ABA problem. This implementation - * keeps a dummy head node to ensure we can always update the queue locklessly. - * Given that this is a queue, the dummy head node must always advance as we - * dequeue entries. Therefore, we keep a reference count on each entry we are - * dequeueing, so they can be kept as dummy head node until the next dequeue, at - * which point their reference count will be decremented. + * Lock-free RCU queue. Enqueue and dequeue operations hold a RCU read + * lock to deal with cmpxchg ABA problem. This queue is *not* circular: + * head points to the oldest node, tail points to the newest node. + * A dummy node is kept to ensure enqueue and dequeue can always proceed + * concurrently. Keeping a separate head and tail helps with large + * queues: enqueue and dequeue can proceed concurrently without + * wrestling for exclusive access to the same variables. + * + * Dequeue retry if it detects that it would be dequeueing the last node + * (it means a dummy node dequeue-requeue is in progress). This ensures + * that there is always at least one node in the queue. + * + * In the dequeue operation, we internally reallocate the dummy node + * upon dequeue/requeue and use call_rcu to free the old one after a + * grace period. */ -#define URCU_LFQ_PERMANENT_REF 128 +static inline +struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q, + struct cds_lfq_node_rcu *next) +{ + struct cds_lfq_node_rcu_dummy *dummy; + + dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy)); + assert(dummy); + dummy->parent.next = next; + dummy->parent.dummy = 1; + dummy->q = q; + return &dummy->parent; +} + +static inline +void free_dummy_cb(struct rcu_head *head) +{ + struct cds_lfq_node_rcu_dummy *dummy = + caa_container_of(head, struct cds_lfq_node_rcu_dummy, head); + free(dummy); +} + +static inline +void rcu_free_dummy(struct cds_lfq_node_rcu *node) +{ + struct cds_lfq_node_rcu_dummy *dummy; + assert(node->dummy); + dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); + dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb); +} + +static inline +void free_dummy(struct cds_lfq_node_rcu *node) +{ + struct cds_lfq_node_rcu_dummy *dummy; + + assert(node->dummy); + dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); + free(dummy); +} + +static inline void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) { node->next = NULL; - urcu_ref_init(&node->ref); + node->dummy = 0; } +static inline void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void (*release)(struct urcu_ref *ref)) + void queue_call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head))) +{ + q->tail = make_dummy(q, NULL); + q->head = q->tail; + q->queue_call_rcu = queue_call_rcu; +} + +/* + * The queue should be emptied before calling destroy. + * + * Return 0 on success, -EPERM if queue is not empty. + */ +static inline +int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) { - _cds_lfq_node_init_rcu(&q->init); - /* Make sure the initial node is never freed. */ - urcu_ref_set(&q->init.ref, URCU_LFQ_PERMANENT_REF); - q->head = q->tail = &q->init; - q->release = release; + struct cds_lfq_node_rcu *head; + + head = rcu_dereference(q->head); + if (!(head->dummy && head->next == NULL)) + return -EPERM; /* not empty */ + free_dummy(head); + return 0; } /* * Should be called under rcu read lock critical section. */ +static inline void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node) { - urcu_ref_get(&node->ref); - node->queue = q; - /* * uatomic_cmpxchg() implicit memory barrier orders earlier stores to * node before publication. @@ -81,23 +151,19 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *tail, *next; tail = rcu_dereference(q->tail); - /* - * Typically expect tail->next to be NULL. - */ next = uatomic_cmpxchg(&tail->next, NULL, node); if (next == NULL) { /* * Tail was at the end of queue, we successfully - * appended to it. - * Now move tail (another enqueue might beat - * us to it, that's fine). + * appended to it. Now move tail (another + * enqueue might beat us to it, that's fine). */ (void) uatomic_cmpxchg(&q->tail, tail, node); return; } else { /* - * Failure to append to current tail. Help moving tail - * further and retry. + * Failure to append to current tail. + * Help moving tail further and retry. */ (void) uatomic_cmpxchg(&q->tail, tail, next); continue; @@ -105,16 +171,24 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, } } +static inline +void enqueue_dummy(struct cds_lfq_queue_rcu *q) +{ + struct cds_lfq_node_rcu *node; + + /* We need to reallocate to protect from ABA. */ + node = make_dummy(q, NULL); + _cds_lfq_enqueue_rcu(q, node); +} + /* * Should be called under rcu read lock critical section. * - * The entry returned by dequeue must be taken care of by doing a - * sequence of urcu_ref_put which release handler should do a call_rcu. - * - * In other words, the entry lfq node returned by dequeue must not be - * modified/re-used/freed until the reference count reaches zero and a grace - * period has elapsed. + * The caller must wait for a grace period to pass before freeing the returned + * node or modifying the cds_lfq_node_rcu structure. + * Returns NULL if queue is empty. */ +static inline struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) { for (;;) { @@ -122,18 +196,27 @@ struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) head = rcu_dereference(q->head); next = rcu_dereference(head->next); - if (next) { - if (uatomic_cmpxchg(&q->head, head, next) == head) { - urcu_ref_put(&head->ref, q->release); - return next; - } else { - /* Concurrently pushed, retry */ - continue; - } - } else { - /* Empty */ - return NULL; + if (head->dummy && next == NULL) + return NULL; /* empty */ + /* + * We never, ever allow dequeue to get to a state where + * the queue is empty (we need at least one node in the + * queue). This is ensured by checking if the head next + * is NULL, which means we need to enqueue a dummy node + * before we can hope dequeuing anything. + */ + if (!next) { + enqueue_dummy(q); + next = rcu_dereference(head->next); + } + if (uatomic_cmpxchg(&q->head, head, next) != head) + continue; /* Concurrently pushed. */ + if (head->dummy) { + /* Free dummy after grace period. */ + rcu_free_dummy(head); + continue; /* try again */ } + return head; } } diff --git a/urcu/static/rculfstack.h b/urcu/static/rculfstack.h index 3f48b7e..ba26231 100644 --- a/urcu/static/rculfstack.h +++ b/urcu/static/rculfstack.h @@ -27,7 +27,6 @@ */ #include -/* A urcu implementation header should be already included. */ #ifdef __cplusplus extern "C" {