RCU lock-free queue: don't hold RCU read lock across retry
[userspace-rcu.git] / urcu / rculfqueue.h
... / ...
CommitLineData
1/*
2 * rculfqueue.h
3 *
4 * Userspace RCU library - Lock-Free RCU Queue
5 *
6 * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23#include <urcu/urcu_ref.h>
24#include <assert.h>
25
26#if (!defined(_GNU_SOURCE) && !defined(_LGPL_SOURCE))
27#error "Dynamic loader LGPL wrappers not implemented yet"
28#endif
29
30/*
31 * Lock-free RCU queue using reference counting. Enqueue and dequeue operations
32 * hold a RCU read lock to deal with cmpxchg ABA problem. This implementation
33 * keeps a dummy head node to ensure we can always update the queue locklessly.
34 * Given that this is a queue, the dummy head node must always advance as we
35 * dequeue entries. Therefore, we keep a reference count on each entry we are
36 * dequeueing, so they can be kept as dummy head node until the next dequeue, at
37 * which point their reference count will be decremented.
38 */
39
40#define URCU_LFQ_PERMANENT_REF 128
41
42struct rcu_lfq_node {
43 struct rcu_lfq_node *next;
44 struct urcu_ref ref;
45};
46
47struct rcu_lfq_queue {
48 struct rcu_lfq_node *head, *tail;
49 struct rcu_lfq_node init; /* Dummy initialization node */
50};
51
52void rcu_lfq_node_init(struct rcu_lfq_node *node)
53{
54 node->next = NULL;
55 urcu_ref_init(&node->ref);
56}
57
58void rcu_lfq_init(struct rcu_lfq_queue *q)
59{
60 rcu_lfq_node_init(&q->init);
61 /* Make sure the initial node is never freed. */
62 urcu_ref_set(&q->init.ref, URCU_LFQ_PERMANENT_REF);
63 q->head = q->tail = &q->init;
64}
65
66void rcu_lfq_enqueue(struct rcu_lfq_queue *q, struct rcu_lfq_node *node)
67{
68 urcu_ref_get(&node->ref);
69
70 /*
71 * uatomic_cmpxchg() implicit memory barrier orders earlier stores to
72 * node before publication.
73 */
74
75 for (;;) {
76 struct rcu_lfq_node *tail, *next;
77
78 rcu_read_lock();
79 tail = rcu_dereference(q->tail);
80 /*
81 * Typically expect tail->next to be NULL.
82 */
83 next = uatomic_cmpxchg(&tail->next, NULL, node);
84 if (next == NULL) {
85 /*
86 * Tail was at the end of queue, we successfully
87 * appended to it.
88 * Now move tail (another enqueue might beat
89 * us to it, that's fine).
90 */
91 uatomic_cmpxchg(&q->tail, tail, node);
92 rcu_read_unlock();
93 return;
94 } else {
95 /*
96 * Failure to append to current tail. Help moving tail
97 * further and retry.
98 */
99 uatomic_cmpxchg(&q->tail, tail, next);
100 rcu_read_unlock();
101 continue;
102 }
103 }
104}
105
106/*
107 * The entry returned by dequeue must be taken care of by doing a urcu_ref_put,
108 * which calls the release primitive when the reference count drops to zero. A
109 * grace period must be waited before performing the actual memory reclamation
110 * in the release primitive.
111 * The entry lfq node returned by dequeue must not be re-used before the
112 * reference count reaches zero.
113 */
114struct rcu_lfq_node *
115rcu_lfq_dequeue(struct rcu_lfq_queue *q, void (*release)(struct urcu_ref *))
116{
117 for (;;) {
118 struct rcu_lfq_node *head, *next;
119
120 rcu_read_lock();
121 head = rcu_dereference(q->head);
122 next = rcu_dereference(head->next);
123 if (next) {
124 if (uatomic_cmpxchg(&q->head, head, next) == head) {
125 rcu_read_unlock();
126 urcu_ref_put(&head->ref, release);
127 return next;
128 } else {
129 /* Concurrently pushed, retry */
130 rcu_read_unlock();
131 continue;
132 }
133 } else {
134 /* Empty */
135 rcu_read_unlock();
136 return NULL;
137 }
138 }
139}
This page took 0.023093 seconds and 4 git commands to generate.