Add coding style document
[urcu.git] / urcu-qsbr.c
1 /*
2 * urcu-qsbr.c
3 *
4 * Userspace RCU QSBR library
5 *
6 * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License as published by the Free Software Foundation; either
12 * version 2.1 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Lesser General Public License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public
20 * License along with this library; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 *
23 * IBM's contributions to this file may be relicensed under LGPLv2 or later.
24 */
25
26 #define _GNU_SOURCE
27 #define _LGPL_SOURCE
28 #include <stdio.h>
29 #include <pthread.h>
30 #include <signal.h>
31 #include <assert.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <string.h>
35 #include <errno.h>
36 #include <poll.h>
37
38 #include "urcu/wfqueue.h"
39 #include "urcu/map/urcu-qsbr.h"
40 #define BUILD_QSBR_LIB
41 #include "urcu/static/urcu-qsbr.h"
42 #include "urcu-pointer.h"
43 #include "urcu/tls-compat.h"
44
45 /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
46 #undef _LGPL_SOURCE
47 #include "urcu-qsbr.h"
48 #define _LGPL_SOURCE
49
50 void __attribute__((destructor)) rcu_exit(void);
51
52 static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER;
53
54 int32_t gp_futex;
55
56 /*
57 * Global grace period counter.
58 */
59 unsigned long rcu_gp_ctr = RCU_GP_ONLINE;
60
61 /*
62 * Active attempts to check for reader Q.S. before calling futex().
63 */
64 #define RCU_QS_ACTIVE_ATTEMPTS 100
65
66 /*
67 * Written to only by each individual reader. Read by both the reader and the
68 * writers.
69 */
70 DEFINE_URCU_TLS(struct rcu_reader, rcu_reader);
71
72 #ifdef DEBUG_YIELD
73 unsigned int yield_active;
74 DEFINE_URCU_TLS(unsigned int, rand_yield);
75 #endif
76
77 static CDS_LIST_HEAD(registry);
78
79 static void mutex_lock(pthread_mutex_t *mutex)
80 {
81 int ret;
82
83 #ifndef DISTRUST_SIGNALS_EXTREME
84 ret = pthread_mutex_lock(mutex);
85 if (ret) {
86 perror("Error in pthread mutex lock");
87 exit(-1);
88 }
89 #else /* #ifndef DISTRUST_SIGNALS_EXTREME */
90 while ((ret = pthread_mutex_trylock(mutex)) != 0) {
91 if (ret != EBUSY && ret != EINTR) {
92 printf("ret = %d, errno = %d\n", ret, errno);
93 perror("Error in pthread mutex lock");
94 exit(-1);
95 }
96 poll(NULL,0,10);
97 }
98 #endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
99 }
100
101 static void mutex_unlock(pthread_mutex_t *mutex)
102 {
103 int ret;
104
105 ret = pthread_mutex_unlock(mutex);
106 if (ret) {
107 perror("Error in pthread mutex unlock");
108 exit(-1);
109 }
110 }
111
112 /*
113 * synchronize_rcu() waiting. Single thread.
114 */
115 static void wait_gp(void)
116 {
117 /* Read reader_gp before read futex */
118 cmm_smp_rmb();
119 if (uatomic_read(&gp_futex) == -1)
120 futex_noasync(&gp_futex, FUTEX_WAIT, -1,
121 NULL, NULL, 0);
122 }
123
124 static void update_counter_and_wait(void)
125 {
126 CDS_LIST_HEAD(qsreaders);
127 int wait_loops = 0;
128 struct rcu_reader *index, *tmp;
129
130 #if (CAA_BITS_PER_LONG < 64)
131 /* Switch parity: 0 -> 1, 1 -> 0 */
132 CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
133 #else /* !(CAA_BITS_PER_LONG < 64) */
134 /* Increment current G.P. */
135 CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
136 #endif /* !(CAA_BITS_PER_LONG < 64) */
137
138 /*
139 * Must commit rcu_gp_ctr update to memory before waiting for
140 * quiescent state. Failure to do so could result in the writer
141 * waiting forever while new readers are always accessing data
142 * (no progress). Enforce compiler-order of store to rcu_gp_ctr
143 * before load URCU_TLS(rcu_reader).ctr.
144 */
145 cmm_barrier();
146
147 /*
148 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
149 * model easier to understand. It does not have a big performance impact
150 * anyway, given this is the write-side.
151 */
152 cmm_smp_mb();
153
154 /*
155 * Wait for each thread rcu_reader_qs_gp count to become 0.
156 */
157 for (;;) {
158 wait_loops++;
159 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
160 uatomic_set(&gp_futex, -1);
161 /*
162 * Write futex before write waiting (the other side
163 * reads them in the opposite order).
164 */
165 cmm_smp_wmb();
166 cds_list_for_each_entry(index, &registry, node) {
167 _CMM_STORE_SHARED(index->waiting, 1);
168 }
169 /* Write futex before read reader_gp */
170 cmm_smp_mb();
171 }
172 cds_list_for_each_entry_safe(index, tmp, &registry, node) {
173 if (!rcu_gp_ongoing(&index->ctr))
174 cds_list_move(&index->node, &qsreaders);
175 }
176
177 if (cds_list_empty(&registry)) {
178 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
179 /* Read reader_gp before write futex */
180 cmm_smp_mb();
181 uatomic_set(&gp_futex, 0);
182 }
183 break;
184 } else {
185 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
186 wait_gp();
187 } else {
188 #ifndef HAS_INCOHERENT_CACHES
189 caa_cpu_relax();
190 #else /* #ifndef HAS_INCOHERENT_CACHES */
191 cmm_smp_mb();
192 #endif /* #else #ifndef HAS_INCOHERENT_CACHES */
193 }
194 }
195 }
196 /* put back the reader list in the registry */
197 cds_list_splice(&qsreaders, &registry);
198 }
199
200 /*
201 * Using a two-subphases algorithm for architectures with smaller than 64-bit
202 * long-size to ensure we do not encounter an overflow bug.
203 */
204
205 #if (CAA_BITS_PER_LONG < 64)
206 void synchronize_rcu(void)
207 {
208 unsigned long was_online;
209
210 was_online = URCU_TLS(rcu_reader).ctr;
211
212 /* All threads should read qparity before accessing data structure
213 * where new ptr points to. In the "then" case, rcu_thread_offline
214 * includes a memory barrier.
215 *
216 * Mark the writer thread offline to make sure we don't wait for
217 * our own quiescent state. This allows using synchronize_rcu()
218 * in threads registered as readers.
219 */
220 if (was_online)
221 rcu_thread_offline();
222 else
223 cmm_smp_mb();
224
225 mutex_lock(&rcu_gp_lock);
226
227 if (cds_list_empty(&registry))
228 goto out;
229
230 /*
231 * Wait for previous parity to be empty of readers.
232 */
233 update_counter_and_wait(); /* 0 -> 1, wait readers in parity 0 */
234
235 /*
236 * Must finish waiting for quiescent state for parity 0 before
237 * committing next rcu_gp_ctr update to memory. Failure to
238 * do so could result in the writer waiting forever while new
239 * readers are always accessing data (no progress). Enforce
240 * compiler-order of load URCU_TLS(rcu_reader).ctr before store to
241 * rcu_gp_ctr.
242 */
243 cmm_barrier();
244
245 /*
246 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
247 * model easier to understand. It does not have a big performance impact
248 * anyway, given this is the write-side.
249 */
250 cmm_smp_mb();
251
252 /*
253 * Wait for previous parity to be empty of readers.
254 */
255 update_counter_and_wait(); /* 1 -> 0, wait readers in parity 1 */
256 out:
257 mutex_unlock(&rcu_gp_lock);
258
259 /*
260 * Finish waiting for reader threads before letting the old ptr being
261 * freed.
262 */
263 if (was_online)
264 rcu_thread_online();
265 else
266 cmm_smp_mb();
267 }
268 #else /* !(CAA_BITS_PER_LONG < 64) */
269 void synchronize_rcu(void)
270 {
271 unsigned long was_online;
272
273 was_online = URCU_TLS(rcu_reader).ctr;
274
275 /*
276 * Mark the writer thread offline to make sure we don't wait for
277 * our own quiescent state. This allows using synchronize_rcu()
278 * in threads registered as readers.
279 */
280 if (was_online)
281 rcu_thread_offline();
282 else
283 cmm_smp_mb();
284
285 mutex_lock(&rcu_gp_lock);
286 if (cds_list_empty(&registry))
287 goto out;
288 update_counter_and_wait();
289 out:
290 mutex_unlock(&rcu_gp_lock);
291
292 if (was_online)
293 rcu_thread_online();
294 else
295 cmm_smp_mb();
296 }
297 #endif /* !(CAA_BITS_PER_LONG < 64) */
298
299 /*
300 * library wrappers to be used by non-LGPL compatible source code.
301 */
302
303 void rcu_read_lock(void)
304 {
305 _rcu_read_lock();
306 }
307
308 void rcu_read_unlock(void)
309 {
310 _rcu_read_unlock();
311 }
312
313 void rcu_quiescent_state(void)
314 {
315 _rcu_quiescent_state();
316 }
317
318 void rcu_thread_offline(void)
319 {
320 _rcu_thread_offline();
321 }
322
323 void rcu_thread_online(void)
324 {
325 _rcu_thread_online();
326 }
327
328 void rcu_register_thread(void)
329 {
330 URCU_TLS(rcu_reader).tid = pthread_self();
331 assert(URCU_TLS(rcu_reader).ctr == 0);
332
333 mutex_lock(&rcu_gp_lock);
334 cds_list_add(&URCU_TLS(rcu_reader).node, &registry);
335 mutex_unlock(&rcu_gp_lock);
336 _rcu_thread_online();
337 }
338
339 void rcu_unregister_thread(void)
340 {
341 /*
342 * We have to make the thread offline otherwise we end up dealocking
343 * with a waiting writer.
344 */
345 _rcu_thread_offline();
346 mutex_lock(&rcu_gp_lock);
347 cds_list_del(&URCU_TLS(rcu_reader).node);
348 mutex_unlock(&rcu_gp_lock);
349 }
350
351 void rcu_exit(void)
352 {
353 /*
354 * Assertion disabled because call_rcu threads are now rcu
355 * readers, and left running at exit.
356 * assert(cds_list_empty(&registry));
357 */
358 }
359
360 DEFINE_RCU_FLAVOR(rcu_flavor);
361
362 #include "urcu-call-rcu-impl.h"
363 #include "urcu-defer-impl.h"
This page took 0.03645 seconds and 4 git commands to generate.