WF queue cleanup
[urcu.git] / urcu / wfqueue.h
... / ...
CommitLineData
1#ifndef _URCU_WFQUEUE_H
2#define _URCU_WFQUEUE_H
3
4/*
5 * wfqueue.h
6 *
7 * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
8 *
9 * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 *
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License as published by the Free Software Foundation; either
14 * version 2.1 of the License, or (at your option) any later version.
15 *
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public
22 * License along with this library; if not, write to the Free Software
23 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26#include <pthread.h>
27#include <assert.h>
28#include <urcu/compiler.h>
29
30#ifdef __cplusplus
31extern "C" {
32#endif
33
34#if (!defined(_GNU_SOURCE) && !defined(_LGPL_SOURCE))
35#error "Dynamic loader LGPL wrappers not implemented yet"
36#endif
37
38#define WFQ_ADAPT_ATTEMPTS 10 /* Retry if being set */
39#define WFQ_WAIT 10 /* Wait 10 ms if being set */
40
41/*
42 * Queue with wait-free enqueue/blocking dequeue.
43 * This implementation adds a dummy head node when the queue is empty to ensure
44 * we can always update the queue locklessly.
45 *
46 * Inspired from half-wait-free/half-blocking queue implementation done by
47 * Paul E. McKenney.
48 */
49
50struct wfq_node {
51 struct wfq_node *next;
52};
53
54struct wfq_queue {
55 struct wfq_node *head, **tail;
56 struct wfq_node dummy; /* Dummy node */
57 pthread_mutex_t lock;
58};
59
60void wfq_node_init(struct wfq_node *node)
61{
62 node->next = NULL;
63}
64
65void wfq_init(struct wfq_queue *q)
66{
67 int ret;
68
69 wfq_node_init(&q->dummy);
70 /* Set queue head and tail */
71 q->head = &q->dummy;
72 q->tail = &q->dummy.next;
73 ret = pthread_mutex_init(&q->lock, NULL);
74 assert(!ret);
75}
76
77void wfq_enqueue(struct wfq_queue *q, struct wfq_node *node)
78{
79 struct wfq_node **old_tail;
80
81 /*
82 * uatomic_xchg() implicit memory barrier orders earlier stores to data
83 * structure containing node and setting node->next to NULL before
84 * publication.
85 */
86 old_tail = uatomic_xchg(&q->tail, node);
87 /*
88 * At this point, dequeuers see a NULL old_tail->next, which indicates
89 * that the queue is being appended to. The following store will append
90 * "node" to the queue from a dequeuer perspective.
91 */
92 STORE_SHARED(*old_tail, node);
93}
94
95/*
96 * It is valid to reuse and free a dequeued node immediately.
97 *
98 * No need to go on a waitqueue here, as there is no possible state in which the
99 * list could cause dequeue to busy-loop needlessly while waiting for another
100 * thread to be scheduled. The queue appears empty until tail->next is set by
101 * enqueue.
102 */
103struct wfq_node *
104__wfq_dequeue_blocking(struct wfq_queue *q)
105{
106 struct wfq_node *node, *next;
107 int attempt = 0;
108
109 /*
110 * Queue is empty if it only contains the dummy node.
111 */
112 if (q->head == &q->dummy && LOAD_SHARED(q->tail) == &q->dummy.next)
113 return NULL;
114 node = q->head;
115
116 /*
117 * Adaptative busy-looping waiting for enqueuer to complete enqueue.
118 */
119 while ((next = LOAD_SHARED(node->next)) == NULL) {
120 if (++attempt >= WFQ_ADAPT_ATTEMPTS) {
121 poll(NULL, 0, WFQ_WAIT); /* Wait for 10ms */
122 attempt = 0;
123 } else
124 cpu_relax();
125 }
126 /*
127 * Move queue head forward.
128 */
129 q->head = next;
130 /*
131 * Requeue dummy node if we just dequeued it.
132 */
133 if (node == &q->dummy) {
134 wfq_node_init(node);
135 wfq_enqueue(q, node);
136 return __wfq_dequeue_blocking(q);
137 }
138 return node;
139}
140
141struct wfq_node *
142wfq_dequeue_blocking(struct wfq_queue *q)
143{
144 struct wfq_node *retnode;
145 int ret;
146
147 ret = pthread_mutex_lock(&q->lock);
148 assert(!ret);
149 retnode = __wfq_dequeue_blocking(q);
150 ret = pthread_mutex_unlock(&q->lock);
151 assert(!ret);
152 return retnode;
153}
154
155#ifdef __cplusplus
156}
157#endif
158
159#endif /* _URCU_WFQUEUE_H */
This page took 0.022305 seconds and 4 git commands to generate.