X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu%2Fstatic%2Frculfqueue.h;h=99335c41ec41d33078279e2f843431111cf307e8;hp=0b83bc37ffb2c5b0e3b19218299286e571d70580;hb=7618919ae496bda84a2efa4f2ad0abe569892a9e;hpb=8f03ed0d5f238075858fdc6a9c64eb4327a39530 diff --git a/urcu/static/rculfqueue.h b/urcu/static/rculfqueue.h index 0b83bc3..99335c4 100644 --- a/urcu/static/rculfqueue.h +++ b/urcu/static/rculfqueue.h @@ -30,7 +30,6 @@ #include #include #include -/* A urcu implementation header should be already included. */ #ifdef __cplusplus extern "C" { @@ -46,32 +45,20 @@ struct cds_lfq_node_rcu_dummy { * Lock-free RCU queue. Enqueue and dequeue operations hold a RCU read * lock to deal with cmpxchg ABA problem. This queue is *not* circular: * head points to the oldest node, tail points to the newest node. - * Dummy nodes are kept to ensure enqueue and dequeue can always proceed + * A dummy node is kept to ensure enqueue and dequeue can always proceed * concurrently. Keeping a separate head and tail helps with large * queues: enqueue and dequeue can proceed concurrently without * wrestling for exclusive access to the same variables. * - * We keep two dummy nodes in the queue to distinguish between empty queue - * state and intermediate state while a dummy node dequeue/requeue is - * being performed. Dequeue retry if it detects that it would be - * dequeueing the last node (it means a dummy node dequeue-requeue is in - * progress). This ensures that there is always at least one node in - * the queue. In a situation where the two dummy nodes are being - * requeued (they therefore don't appear in the queue at a given - * moment), we are certain that there is at least one non-dummy node in - * the queue (ensured by the test for NULL next node upon dequeue). + * Dequeue retry if it detects that it would be dequeueing the last node + * (it means a dummy node dequeue-requeue is in progress). This ensures + * that there is always at least one node in the queue. * - * In the dequeue operation, we internally reallocate the dummy nodes - * upon dequeue/requeue and use call_rcu to free them after a grace - * period. + * In the dequeue operation, we internally reallocate the dummy node + * upon dequeue/requeue and use call_rcu to free the old one after a + * grace period. */ -static inline -int is_dummy(struct cds_lfq_node_rcu *node) -{ - return ((unsigned long) node) & 0x1UL; -} - static inline struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *next) @@ -81,18 +68,13 @@ struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q, dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy)); assert(dummy); dummy->parent.next = next; + dummy->parent.dummy = 1; dummy->q = q; - return (struct cds_lfq_node_rcu *) (((unsigned long) &dummy->parent) | 0x1UL); + return &dummy->parent; } static inline -struct cds_lfq_node_rcu *get_node(struct cds_lfq_node_rcu *node) -{ - return (struct cds_lfq_node_rcu *) (((unsigned long )node) & ~0x1UL); -} - -static inline -void free_dummy(struct rcu_head *head) +void free_dummy_cb(struct rcu_head *head) { struct cds_lfq_node_rcu_dummy *dummy = caa_container_of(head, struct cds_lfq_node_rcu_dummy, head); @@ -104,25 +86,33 @@ void rcu_free_dummy(struct cds_lfq_node_rcu *node) { struct cds_lfq_node_rcu_dummy *dummy; - dummy = caa_container_of(get_node(node), struct cds_lfq_node_rcu_dummy, - parent); - dummy->q->queue_call_rcu(&dummy->head, free_dummy); + assert(node->dummy); + dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); + call_rcu(&dummy->head, free_dummy_cb); +} + +static inline +void free_dummy(struct cds_lfq_node_rcu *node) +{ + struct cds_lfq_node_rcu_dummy *dummy; + + assert(node->dummy); + dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); + free(dummy); } static inline void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) { node->next = NULL; + node->dummy = 0; } static inline -void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void queue_call_rcu(struct rcu_head *head, - void (*func)(struct rcu_head *head))) +void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q) { q->tail = make_dummy(q, NULL); - q->head = make_dummy(q, q->tail); - q->queue_call_rcu = queue_call_rcu; + q->head = q->tail; } /* @@ -133,19 +123,17 @@ void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, static inline int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) { - struct cds_lfq_node_rcu *head, *next; + struct cds_lfq_node_rcu *head; head = rcu_dereference(q->head); - next = rcu_dereference(get_node(head)->next); - if (!(is_dummy(head) && is_dummy(next) && get_node(next)->next == NULL)) + if (!(head->dummy && head->next == NULL)) return -EPERM; /* not empty */ - rcu_free_dummy(head); - rcu_free_dummy(next); + free_dummy(head); return 0; } /* - * Should be called under rcu read lock critical section. + * Acts as a RCU reader. */ static inline void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, @@ -159,8 +147,9 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, for (;;) { struct cds_lfq_node_rcu *tail, *next; + rcu_read_lock(); tail = rcu_dereference(q->tail); - next = uatomic_cmpxchg(&get_node(tail)->next, NULL, node); + next = uatomic_cmpxchg(&tail->next, NULL, node); if (next == NULL) { /* * Tail was at the end of queue, we successfully @@ -168,6 +157,7 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, * enqueue might beat us to it, that's fine). */ (void) uatomic_cmpxchg(&q->tail, tail, node); + rcu_read_unlock(); return; } else { /* @@ -175,13 +165,24 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, * Help moving tail further and retry. */ (void) uatomic_cmpxchg(&q->tail, tail, next); + rcu_read_unlock(); continue; } } } +static inline +void enqueue_dummy(struct cds_lfq_queue_rcu *q) +{ + struct cds_lfq_node_rcu *node; + + /* We need to reallocate to protect from ABA. */ + node = make_dummy(q, NULL); + _cds_lfq_enqueue_rcu(q, node); +} + /* - * Should be called under rcu read lock critical section. + * Acts as a RCU reader. * * The caller must wait for a grace period to pass before freeing the returned * node or modifying the cds_lfq_node_rcu structure. @@ -193,40 +194,36 @@ struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) for (;;) { struct cds_lfq_node_rcu *head, *next; + rcu_read_lock(); head = rcu_dereference(q->head); - next = rcu_dereference(get_node(head)->next); - if (is_dummy(head) && is_dummy(next) && get_node(next)->next == NULL) + next = rcu_dereference(head->next); + if (head->dummy && next == NULL) { + rcu_read_unlock(); return NULL; /* empty */ + } /* * We never, ever allow dequeue to get to a state where * the queue is empty (we need at least one node in the * queue). This is ensured by checking if the head next - * is NULL and retry in that case (this means a - * concurrent dummy node re-enqueue is in progress). + * is NULL, which means we need to enqueue a dummy node + * before we can hope dequeuing anything. */ - if (next) { - if (uatomic_cmpxchg(&q->head, head, next) == head) { - if (is_dummy(head)) { - struct cds_lfq_node_rcu *node; - /* - * Requeue dummy. We need to - * reallocate to protect from - * ABA. - */ - rcu_free_dummy(head); - node = make_dummy(q, NULL); - _cds_lfq_enqueue_rcu(q, node); - continue; /* try again */ - } - return head; - } else { - /* Concurrently pushed, retry */ - continue; - } - } else { - /* Dummy node re-enqueue is in progress, retry. */ - continue; + if (!next) { + enqueue_dummy(q); + next = rcu_dereference(head->next); + } + if (uatomic_cmpxchg(&q->head, head, next) != head) { + rcu_read_unlock(); + continue; /* Concurrently pushed. */ + } + if (head->dummy) { + /* Free dummy after grace period. */ + rcu_free_dummy(head); + rcu_read_unlock(); + continue; /* try again */ } + rcu_read_unlock(); + return head; } }