X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu%2Fstatic%2Frculfqueue.h;h=af73c6f2ef7565dc69e16f62d539ded7ed66d583;hp=75df98582e00f5cc94ded1b29af2a0bc8867cf09;hb=a59f39055b5ecb77b68cf78b9839aa9e8e4ec332;hpb=a2e7bf9ce5de5113c7f59c380b0087e291cd603d diff --git a/urcu/static/rculfqueue.h b/urcu/static/rculfqueue.h index 75df985..af73c6f 100644 --- a/urcu/static/rculfqueue.h +++ b/urcu/static/rculfqueue.h @@ -26,52 +26,123 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ -#include +#include #include +#include #include -/* A urcu implementation header should be already included. */ +#include #ifdef __cplusplus extern "C" { #endif +struct cds_lfq_node_rcu_dummy { + struct cds_lfq_node_rcu parent; + struct rcu_head head; + struct cds_lfq_queue_rcu *q; +}; + /* - * Lock-free RCU queue using reference counting. Enqueue and dequeue operations - * hold a RCU read lock to deal with cmpxchg ABA problem. This implementation - * keeps a dummy head node to ensure we can always update the queue locklessly. - * Given that this is a queue, the dummy head node must always advance as we - * dequeue entries. Therefore, we keep a reference count on each entry we are - * dequeueing, so they can be kept as dummy head node until the next dequeue, at - * which point their reference count will be decremented. + * Lock-free RCU queue. Enqueue and dequeue operations hold a RCU read + * lock to deal with cmpxchg ABA problem. This queue is *not* circular: + * head points to the oldest node, tail points to the newest node. + * A dummy node is kept to ensure enqueue and dequeue can always proceed + * concurrently. Keeping a separate head and tail helps with large + * queues: enqueue and dequeue can proceed concurrently without + * wrestling for exclusive access to the same variables. + * + * Dequeue retry if it detects that it would be dequeueing the last node + * (it means a dummy node dequeue-requeue is in progress). This ensures + * that there is always at least one node in the queue. + * + * In the dequeue operation, we internally reallocate the dummy node + * upon dequeue/requeue and use call_rcu to free the old one after a + * grace period. */ -#define URCU_LFQ_PERMANENT_REF 128 +static inline +struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q, + struct cds_lfq_node_rcu *next) +{ + struct cds_lfq_node_rcu_dummy *dummy; + + dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy)); + assert(dummy); + dummy->parent.next = next; + dummy->parent.dummy = 1; + dummy->q = q; + return &dummy->parent; +} + +static inline +void free_dummy_cb(struct rcu_head *head) +{ + struct cds_lfq_node_rcu_dummy *dummy = + caa_container_of(head, struct cds_lfq_node_rcu_dummy, head); + free(dummy); +} + +static inline +void rcu_free_dummy(struct cds_lfq_node_rcu *node) +{ + struct cds_lfq_node_rcu_dummy *dummy; + assert(node->dummy); + dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); + dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb); +} + +static inline +void free_dummy(struct cds_lfq_node_rcu *node) +{ + struct cds_lfq_node_rcu_dummy *dummy; + + assert(node->dummy); + dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); + free(dummy); +} + +static inline void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) { node->next = NULL; - urcu_ref_init(&node->ref); + node->dummy = 0; } +static inline void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void (*release)(struct urcu_ref *ref)) + void queue_call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head))) +{ + q->tail = make_dummy(q, NULL); + q->head = q->tail; + q->queue_call_rcu = queue_call_rcu; +} + +/* + * The queue should be emptied before calling destroy. + * + * Return 0 on success, -EPERM if queue is not empty. + */ +static inline +int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) { - _cds_lfq_node_init_rcu(&q->init); - /* Make sure the initial node is never freed. */ - urcu_ref_set(&q->init.ref, URCU_LFQ_PERMANENT_REF); - q->head = q->tail = &q->init; - q->release = release; + struct cds_lfq_node_rcu *head; + + head = rcu_dereference(q->head); + if (!(head->dummy && head->next == NULL)) + return -EPERM; /* not empty */ + free_dummy(head); + return 0; } /* * Should be called under rcu read lock critical section. */ +static inline void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node) { - urcu_ref_get(&node->ref); - node->queue = q; - /* * uatomic_cmpxchg() implicit memory barrier orders earlier stores to * node before publication. @@ -81,23 +152,19 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *tail, *next; tail = rcu_dereference(q->tail); - /* - * Typically expect tail->next to be NULL. - */ next = uatomic_cmpxchg(&tail->next, NULL, node); if (next == NULL) { /* * Tail was at the end of queue, we successfully - * appended to it. - * Now move tail (another enqueue might beat - * us to it, that's fine). + * appended to it. Now move tail (another + * enqueue might beat us to it, that's fine). */ (void) uatomic_cmpxchg(&q->tail, tail, node); return; } else { /* - * Failure to append to current tail. Help moving tail - * further and retry. + * Failure to append to current tail. + * Help moving tail further and retry. */ (void) uatomic_cmpxchg(&q->tail, tail, next); continue; @@ -105,16 +172,24 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, } } +static inline +void enqueue_dummy(struct cds_lfq_queue_rcu *q) +{ + struct cds_lfq_node_rcu *node; + + /* We need to reallocate to protect from ABA. */ + node = make_dummy(q, NULL); + _cds_lfq_enqueue_rcu(q, node); +} + /* * Should be called under rcu read lock critical section. * - * The entry returned by dequeue must be taken care of by doing a - * sequence of urcu_ref_put which release handler should do a call_rcu. - * - * In other words, the entry lfq node returned by dequeue must not be - * modified/re-used/freed until the reference count reaches zero and a grace - * period has elapsed. + * The caller must wait for a grace period to pass before freeing the returned + * node or modifying the cds_lfq_node_rcu structure. + * Returns NULL if queue is empty. */ +static inline struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) { for (;;) { @@ -122,18 +197,27 @@ struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) head = rcu_dereference(q->head); next = rcu_dereference(head->next); - if (next) { - if (uatomic_cmpxchg(&q->head, head, next) == head) { - urcu_ref_put(&head->ref, q->release); - return next; - } else { - /* Concurrently pushed, retry */ - continue; - } - } else { - /* Empty */ - return NULL; + if (head->dummy && next == NULL) + return NULL; /* empty */ + /* + * We never, ever allow dequeue to get to a state where + * the queue is empty (we need at least one node in the + * queue). This is ensured by checking if the head next + * is NULL, which means we need to enqueue a dummy node + * before we can hope dequeuing anything. + */ + if (!next) { + enqueue_dummy(q); + next = rcu_dereference(head->next); + } + if (uatomic_cmpxchg(&q->head, head, next) != head) + continue; /* Concurrently pushed. */ + if (head->dummy) { + /* Free dummy after grace period. */ + rcu_free_dummy(head); + continue; /* try again */ } + return head; } }