X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu%2Fstatic%2Frculfqueue.h;h=99335c41ec41d33078279e2f843431111cf307e8;hp=b627e450cfdd581692b474d89437e3fd47f18463;hb=7618919ae496bda84a2efa4f2ad0abe569892a9e;hpb=f9bf6d545e9e4801bec75043d716f9810ca1024d diff --git a/urcu/static/rculfqueue.h b/urcu/static/rculfqueue.h index b627e45..99335c4 100644 --- a/urcu/static/rculfqueue.h +++ b/urcu/static/rculfqueue.h @@ -26,52 +26,119 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ -#include +#include #include #include -/* A urcu implementation header should be already included. */ +#include #ifdef __cplusplus extern "C" { #endif +struct cds_lfq_node_rcu_dummy { + struct cds_lfq_node_rcu parent; + struct rcu_head head; + struct cds_lfq_queue_rcu *q; +}; + /* - * Lock-free RCU queue using reference counting. Enqueue and dequeue operations - * hold a RCU read lock to deal with cmpxchg ABA problem. This implementation - * keeps a dummy head node to ensure we can always update the queue locklessly. - * Given that this is a queue, the dummy head node must always advance as we - * dequeue entries. Therefore, we keep a reference count on each entry we are - * dequeueing, so they can be kept as dummy head node until the next dequeue, at - * which point their reference count will be decremented. + * Lock-free RCU queue. Enqueue and dequeue operations hold a RCU read + * lock to deal with cmpxchg ABA problem. This queue is *not* circular: + * head points to the oldest node, tail points to the newest node. + * A dummy node is kept to ensure enqueue and dequeue can always proceed + * concurrently. Keeping a separate head and tail helps with large + * queues: enqueue and dequeue can proceed concurrently without + * wrestling for exclusive access to the same variables. + * + * Dequeue retry if it detects that it would be dequeueing the last node + * (it means a dummy node dequeue-requeue is in progress). This ensures + * that there is always at least one node in the queue. + * + * In the dequeue operation, we internally reallocate the dummy node + * upon dequeue/requeue and use call_rcu to free the old one after a + * grace period. */ -#define URCU_LFQ_PERMANENT_REF 128 +static inline +struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q, + struct cds_lfq_node_rcu *next) +{ + struct cds_lfq_node_rcu_dummy *dummy; + + dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy)); + assert(dummy); + dummy->parent.next = next; + dummy->parent.dummy = 1; + dummy->q = q; + return &dummy->parent; +} +static inline +void free_dummy_cb(struct rcu_head *head) +{ + struct cds_lfq_node_rcu_dummy *dummy = + caa_container_of(head, struct cds_lfq_node_rcu_dummy, head); + free(dummy); +} + +static inline +void rcu_free_dummy(struct cds_lfq_node_rcu *node) +{ + struct cds_lfq_node_rcu_dummy *dummy; + + assert(node->dummy); + dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); + call_rcu(&dummy->head, free_dummy_cb); +} + +static inline +void free_dummy(struct cds_lfq_node_rcu *node) +{ + struct cds_lfq_node_rcu_dummy *dummy; + + assert(node->dummy); + dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); + free(dummy); +} + +static inline void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) { node->next = NULL; - urcu_ref_init(&node->ref); + node->dummy = 0; +} + +static inline +void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q) +{ + q->tail = make_dummy(q, NULL); + q->head = q->tail; } -void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, - void (*release)(struct urcu_ref *ref)) +/* + * The queue should be emptied before calling destroy. + * + * Return 0 on success, -EPERM if queue is not empty. + */ +static inline +int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) { - _cds_lfq_node_init_rcu(&q->init); - /* Make sure the initial node is never freed. */ - urcu_ref_set(&q->init.ref, URCU_LFQ_PERMANENT_REF); - q->head = q->tail = &q->init; - q->release = release; + struct cds_lfq_node_rcu *head; + + head = rcu_dereference(q->head); + if (!(head->dummy && head->next == NULL)) + return -EPERM; /* not empty */ + free_dummy(head); + return 0; } /* - * Should be called under rcu read lock critical section. + * Acts as a RCU reader. */ +static inline void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node) { - urcu_ref_get(&node->ref); - node->queue = q; - /* * uatomic_cmpxchg() implicit memory barrier orders earlier stores to * node before publication. @@ -80,60 +147,83 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, for (;;) { struct cds_lfq_node_rcu *tail, *next; + rcu_read_lock(); tail = rcu_dereference(q->tail); - /* - * Typically expect tail->next to be NULL. - */ next = uatomic_cmpxchg(&tail->next, NULL, node); if (next == NULL) { /* * Tail was at the end of queue, we successfully - * appended to it. - * Now move tail (another enqueue might beat - * us to it, that's fine). + * appended to it. Now move tail (another + * enqueue might beat us to it, that's fine). */ (void) uatomic_cmpxchg(&q->tail, tail, node); + rcu_read_unlock(); return; } else { /* - * Failure to append to current tail. Help moving tail - * further and retry. + * Failure to append to current tail. + * Help moving tail further and retry. */ (void) uatomic_cmpxchg(&q->tail, tail, next); + rcu_read_unlock(); continue; } } } +static inline +void enqueue_dummy(struct cds_lfq_queue_rcu *q) +{ + struct cds_lfq_node_rcu *node; + + /* We need to reallocate to protect from ABA. */ + node = make_dummy(q, NULL); + _cds_lfq_enqueue_rcu(q, node); +} + /* - * Should be called under rcu read lock critical section. - * - * The entry returned by dequeue must be taken care of by doing a - * sequence of urcu_ref_put which release handler should do a call_rcu. + * Acts as a RCU reader. * - * In other words, the entry lfq node returned by dequeue must not be - * modified/re-used/freed until the reference count reaches zero and a grace - * period has elapsed. + * The caller must wait for a grace period to pass before freeing the returned + * node or modifying the cds_lfq_node_rcu structure. + * Returns NULL if queue is empty. */ +static inline struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) { for (;;) { struct cds_lfq_node_rcu *head, *next; + rcu_read_lock(); head = rcu_dereference(q->head); next = rcu_dereference(head->next); - if (next) { - if (uatomic_cmpxchg(&q->head, head, next) == head) { - urcu_ref_put(&head->ref, q->release); - return next; - } else { - /* Concurrently pushed, retry */ - continue; - } - } else { - /* Empty */ - return NULL; + if (head->dummy && next == NULL) { + rcu_read_unlock(); + return NULL; /* empty */ + } + /* + * We never, ever allow dequeue to get to a state where + * the queue is empty (we need at least one node in the + * queue). This is ensured by checking if the head next + * is NULL, which means we need to enqueue a dummy node + * before we can hope dequeuing anything. + */ + if (!next) { + enqueue_dummy(q); + next = rcu_dereference(head->next); + } + if (uatomic_cmpxchg(&q->head, head, next) != head) { + rcu_read_unlock(); + continue; /* Concurrently pushed. */ + } + if (head->dummy) { + /* Free dummy after grace period. */ + rcu_free_dummy(head); + rcu_read_unlock(); + continue; /* try again */ } + rcu_read_unlock(); + return head; } }