Add lock-free RCU queue and stack
[urcu.git] / urcu / rculfqueue.h
... / ...
CommitLineData
1/*
2 * rculfqueue.h
3 *
4 * Userspace RCU library - Lock-Free RCU Queue
5 *
6 * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23#include <urcu/urcu_ref.h>
24#include <assert.h>
25
26#if (!defined(_GNU_SOURCE) && !defined(_LGPL_SOURCE))
27#error "Dynamic loader LGPL wrappers not implemented yet"
28#endif
29
30/*
31 * Lock-free RCU queue using reference counting. Enqueue and dequeue operations
32 * hold a RCU read lock to deal with cmpxchg ABA problem. This implementation
33 * keeps a dummy head node to ensure we can always update the queue locklessly.
34 * Given that this is a queue, the dummy head node must always advance as we
35 * dequeue entries. Therefore, we keep a reference count on each entry we are
36 * dequeueing, so they can be kept as dummy head node until the next dequeue, at
37 * which point their reference count will be decremented.
38 */
39
40#define URCU_LFQ_PERMANENT_REF 128
41
42struct rcu_lfq_node {
43 struct rcu_lfq_node *next;
44 struct urcu_ref ref;
45};
46
47struct rcu_lfq_queue {
48 struct rcu_lfq_node *head, *tail;
49 struct rcu_lfq_node init; /* Dummy initialization node */
50};
51
52void rcu_lfq_node_init(struct rcu_lfq_node *node)
53{
54 node->next = NULL;
55 urcu_ref_init(&node->ref);
56}
57
58void rcu_lfq_init(struct rcu_lfq_queue *q)
59{
60 rcu_lfq_node_init(&q->init);
61 /* Make sure the initial node is never freed. */
62 urcu_ref_set(&q->init.ref, URCU_LFQ_PERMANENT_REF);
63 q->head = q->tail = &q->init;
64}
65
66void rcu_lfq_enqueue(struct rcu_lfq_queue *q, struct rcu_lfq_node *node)
67{
68 urcu_ref_get(&node->ref);
69
70 /*
71 * uatomic_cmpxchg() implicit memory barrier orders earlier stores to
72 * node before publication.
73 */
74
75 rcu_read_lock();
76 for (;;) {
77 struct rcu_lfq_node *tail = rcu_dereference(q->tail);
78 struct rcu_lfq_node *next;
79
80 /*
81 * Typically expect tail to be NULL.
82 */
83 next = uatomic_cmpxchg(&tail->next, NULL, node);
84 if (next == NULL) {
85 /*
86 * Tail was at the end of queue, we successfully
87 * appended to it.
88 * Now move tail (another enqueue might beat
89 * us to it, that's fine).
90 */
91 uatomic_cmpxchg(&q->tail, tail, node);
92 rcu_read_unlock();
93 return;
94 } else {
95 /*
96 * Failure to append to current tail. Help moving tail
97 * further and retry.
98 */
99 uatomic_cmpxchg(&q->tail, tail, next);
100 continue;
101 }
102 }
103}
104
105/*
106 * The entry returned by dequeue must be taken care of by doing a urcu_ref_put,
107 * which calls the release primitive when the reference count drops to zero. A
108 * grace period must be waited before performing the actual memory reclamation
109 * in the release primitive.
110 * The entry lfq node returned by dequeue must no be re-used before the
111 * reference count reaches zero.
112 */
113struct rcu_lfq_node *
114rcu_lfq_dequeue(struct rcu_lfq_queue *q, void (*release)(struct urcu_ref *))
115{
116 rcu_read_lock();
117 for (;;) {
118 struct rcu_lfq_node *head = rcu_dereference(q->head);
119 struct rcu_lfq_node *next = rcu_dereference(head->next);
120
121 if (next) {
122 if (uatomic_cmpxchg(&q->head, head, next) == head) {
123 rcu_read_unlock();
124 urcu_ref_put(&head->ref, release);
125 return next;
126 } else {
127 /* Concurrently pushed, retry */
128 continue;
129 }
130 } else {
131 /* Empty */
132 rcu_read_unlock();
133 return NULL;
134 }
135 }
136}
This page took 0.022594 seconds and 4 git commands to generate.