urcu-qsbr: batch concurrent synchronize_rcu()
[urcu.git] / urcu-qsbr.c
1 /*
2 * urcu-qsbr.c
3 *
4 * Userspace RCU QSBR library
5 *
6 * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License as published by the Free Software Foundation; either
12 * version 2.1 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Lesser General Public License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public
20 * License along with this library; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 *
23 * IBM's contributions to this file may be relicensed under LGPLv2 or later.
24 */
25
26 #define _GNU_SOURCE
27 #define _LGPL_SOURCE
28 #include <stdio.h>
29 #include <pthread.h>
30 #include <signal.h>
31 #include <assert.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <string.h>
35 #include <errno.h>
36 #include <poll.h>
37
38 #include "urcu/wfcqueue.h"
39 #include "urcu/wfstack.h"
40 #include "urcu/map/urcu-qsbr.h"
41 #define BUILD_QSBR_LIB
42 #include "urcu/static/urcu-qsbr.h"
43 #include "urcu-pointer.h"
44 #include "urcu/tls-compat.h"
45
46 #include "urcu-die.h"
47
48 /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
49 #undef _LGPL_SOURCE
50 #include "urcu-qsbr.h"
51 #define _LGPL_SOURCE
52
53 void __attribute__((destructor)) rcu_exit(void);
54
55 static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER;
56
57 int32_t rcu_gp_futex;
58
59 /*
60 * Global grace period counter.
61 */
62 unsigned long rcu_gp_ctr = RCU_GP_ONLINE;
63
64 /*
65 * Active attempts to check for reader Q.S. before calling futex().
66 */
67 #define RCU_QS_ACTIVE_ATTEMPTS 100
68
69 /*
70 * Written to only by each individual reader. Read by both the reader and the
71 * writers.
72 */
73 DEFINE_URCU_TLS(struct rcu_reader, rcu_reader);
74
75 #ifdef DEBUG_YIELD
76 unsigned int rcu_yield_active;
77 DEFINE_URCU_TLS(unsigned int, rcu_rand_yield);
78 #endif
79
80 static CDS_LIST_HEAD(registry);
81
82 /*
83 * Number of busy-loop attempts before waiting on futex for grace period
84 * batching.
85 */
86 #define RCU_AWAKE_ATTEMPTS 1000
87
88 enum adapt_wakeup_state {
89 /* AWAKE_WAITING is compared directly (futex compares it). */
90 AWAKE_WAITING = 0,
91 /* non-zero are used as masks. */
92 AWAKE_WAKEUP = (1 << 0),
93 AWAKE_AWAKENED = (1 << 1),
94 AWAKE_TEARDOWN = (1 << 2),
95 };
96
97 struct gp_waiters_thread {
98 struct cds_wfs_node node;
99 int32_t wait_futex;
100 };
101
102 /*
103 * Stack keeping threads awaiting to wait for a grace period. Contains
104 * struct gp_waiters_thread objects.
105 */
106 static struct cds_wfs_stack gp_waiters = {
107 .head = CDS_WFS_END,
108 .lock = PTHREAD_MUTEX_INITIALIZER,
109 };
110
111 static void mutex_lock(pthread_mutex_t *mutex)
112 {
113 int ret;
114
115 #ifndef DISTRUST_SIGNALS_EXTREME
116 ret = pthread_mutex_lock(mutex);
117 if (ret)
118 urcu_die(ret);
119 #else /* #ifndef DISTRUST_SIGNALS_EXTREME */
120 while ((ret = pthread_mutex_trylock(mutex)) != 0) {
121 if (ret != EBUSY && ret != EINTR)
122 urcu_die(ret);
123 poll(NULL,0,10);
124 }
125 #endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
126 }
127
128 static void mutex_unlock(pthread_mutex_t *mutex)
129 {
130 int ret;
131
132 ret = pthread_mutex_unlock(mutex);
133 if (ret)
134 urcu_die(ret);
135 }
136
137 /*
138 * synchronize_rcu() waiting. Single thread.
139 */
140 static void wait_gp(void)
141 {
142 /* Read reader_gp before read futex */
143 cmm_smp_rmb();
144 if (uatomic_read(&rcu_gp_futex) == -1)
145 futex_noasync(&rcu_gp_futex, FUTEX_WAIT, -1,
146 NULL, NULL, 0);
147 }
148
149 /*
150 * Note: urcu_adaptative_wake_up needs "value" to stay allocated
151 * throughout its execution. In this scheme, the waiter owns the futex
152 * memory, and we only allow it to free this memory when it receives the
153 * AWAKE_TEARDOWN flag.
154 */
155 static void urcu_adaptative_wake_up(int32_t *value)
156 {
157 cmm_smp_mb();
158 assert(uatomic_read(value) == AWAKE_WAITING);
159 uatomic_set(value, AWAKE_WAKEUP);
160 if (!(uatomic_read(value) & AWAKE_AWAKENED))
161 futex_noasync(value, FUTEX_WAKE, 1, NULL, NULL, 0);
162 /* Allow teardown of "value" memory. */
163 uatomic_or(value, AWAKE_TEARDOWN);
164 }
165
166 /*
167 * Caller must initialize "value" to AWAKE_WAITING before passing its
168 * memory to waker thread.
169 */
170 static void urcu_adaptative_busy_wait(int32_t *value)
171 {
172 unsigned int i;
173
174 /* Load and test condition before read futex */
175 cmm_smp_rmb();
176 for (i = 0; i < RCU_AWAKE_ATTEMPTS; i++) {
177 if (uatomic_read(value) != AWAKE_WAITING)
178 goto skip_futex_wait;
179 caa_cpu_relax();
180 }
181 futex_noasync(value, FUTEX_WAIT, AWAKE_WAITING, NULL, NULL, 0);
182 skip_futex_wait:
183
184 /* Tell waker thread than we are awakened. */
185 uatomic_or(value, AWAKE_AWAKENED);
186
187 /*
188 * Wait until waker thread lets us know it's ok to tear down
189 * memory allocated for value.
190 */
191 for (i = 0; i < RCU_AWAKE_ATTEMPTS; i++) {
192 if (uatomic_read(value) & AWAKE_TEARDOWN)
193 break;
194 caa_cpu_relax();
195 }
196 while (!(uatomic_read(value) & AWAKE_TEARDOWN))
197 poll(NULL, 0, 10);
198 assert(uatomic_read(value) & AWAKE_TEARDOWN);
199 }
200
201 static void wait_for_readers(struct cds_list_head *input_readers,
202 struct cds_list_head *cur_snap_readers,
203 struct cds_list_head *qsreaders)
204 {
205 int wait_loops = 0;
206 struct rcu_reader *index, *tmp;
207
208 /*
209 * Wait for each thread URCU_TLS(rcu_reader).ctr to either
210 * indicate quiescence (offline), or for them to observe the
211 * current rcu_gp_ctr value.
212 */
213 for (;;) {
214 wait_loops++;
215 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
216 uatomic_set(&rcu_gp_futex, -1);
217 /*
218 * Write futex before write waiting (the other side
219 * reads them in the opposite order).
220 */
221 cmm_smp_wmb();
222 cds_list_for_each_entry(index, input_readers, node) {
223 _CMM_STORE_SHARED(index->waiting, 1);
224 }
225 /* Write futex before read reader_gp */
226 cmm_smp_mb();
227 }
228 cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
229 switch (rcu_reader_state(&index->ctr)) {
230 case RCU_READER_ACTIVE_CURRENT:
231 if (cur_snap_readers) {
232 cds_list_move(&index->node,
233 cur_snap_readers);
234 break;
235 }
236 /* Fall-through */
237 case RCU_READER_INACTIVE:
238 cds_list_move(&index->node, qsreaders);
239 break;
240 case RCU_READER_ACTIVE_OLD:
241 /*
242 * Old snapshot. Leaving node in
243 * input_readers will make us busy-loop
244 * until the snapshot becomes current or
245 * the reader becomes inactive.
246 */
247 break;
248 }
249 }
250
251 if (cds_list_empty(input_readers)) {
252 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
253 /* Read reader_gp before write futex */
254 cmm_smp_mb();
255 uatomic_set(&rcu_gp_futex, 0);
256 }
257 break;
258 } else {
259 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
260 wait_gp();
261 } else {
262 #ifndef HAS_INCOHERENT_CACHES
263 caa_cpu_relax();
264 #else /* #ifndef HAS_INCOHERENT_CACHES */
265 cmm_smp_mb();
266 #endif /* #else #ifndef HAS_INCOHERENT_CACHES */
267 }
268 }
269 }
270 }
271
272 /*
273 * Using a two-subphases algorithm for architectures with smaller than 64-bit
274 * long-size to ensure we do not encounter an overflow bug.
275 */
276
277 #if (CAA_BITS_PER_LONG < 64)
278 void synchronize_rcu(void)
279 {
280 CDS_LIST_HEAD(cur_snap_readers);
281 CDS_LIST_HEAD(qsreaders);
282 unsigned long was_online;
283 struct gp_waiters_thread gp_waiters_thread;
284 struct cds_wfs_head *gp_waiters_head;
285 struct cds_wfs_node *waiters_iter, *waiters_iter_n;
286
287 was_online = URCU_TLS(rcu_reader).ctr;
288
289 /* All threads should read qparity before accessing data structure
290 * where new ptr points to. In the "then" case, rcu_thread_offline
291 * includes a memory barrier.
292 *
293 * Mark the writer thread offline to make sure we don't wait for
294 * our own quiescent state. This allows using synchronize_rcu()
295 * in threads registered as readers.
296 */
297 if (was_online)
298 rcu_thread_offline();
299 else
300 cmm_smp_mb();
301
302 /*
303 * Add ourself to gp_waiters stack of threads awaiting to wait
304 * for a grace period. Proceed to perform the grace period only
305 * if we are the first thread added into the stack.
306 */
307 cds_wfs_node_init(&gp_waiters_thread.node);
308 gp_waiters_thread.wait_futex = AWAKE_WAITING;
309 if (cds_wfs_push(&gp_waiters, &gp_waiters_node) != 0) {
310 /* Not first in stack: will be awakened by another thread. */
311 urcu_adaptative_busy_wait(&gp_waiters_thread.wait_futex);
312 goto gp_end;
313 }
314
315 mutex_lock(&rcu_gp_lock);
316
317 /*
318 * Pop all waiters into our local stack head.
319 */
320 gp_waiters_head = __cds_wfs_pop_all(&gp_waiters);
321
322 if (cds_list_empty(&registry))
323 goto out;
324
325 /*
326 * Wait for readers to observe original parity or be quiescent.
327 */
328 wait_for_readers(&registry, &cur_snap_readers, &qsreaders);
329
330 /*
331 * Must finish waiting for quiescent state for original parity
332 * before committing next rcu_gp_ctr update to memory. Failure
333 * to do so could result in the writer waiting forever while new
334 * readers are always accessing data (no progress). Enforce
335 * compiler-order of load URCU_TLS(rcu_reader).ctr before store
336 * to rcu_gp_ctr.
337 */
338 cmm_barrier();
339
340 /*
341 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
342 * model easier to understand. It does not have a big performance impact
343 * anyway, given this is the write-side.
344 */
345 cmm_smp_mb();
346
347 /* Switch parity: 0 -> 1, 1 -> 0 */
348 CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
349
350 /*
351 * Must commit rcu_gp_ctr update to memory before waiting for
352 * quiescent state. Failure to do so could result in the writer
353 * waiting forever while new readers are always accessing data
354 * (no progress). Enforce compiler-order of store to rcu_gp_ctr
355 * before load URCU_TLS(rcu_reader).ctr.
356 */
357 cmm_barrier();
358
359 /*
360 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
361 * model easier to understand. It does not have a big performance impact
362 * anyway, given this is the write-side.
363 */
364 cmm_smp_mb();
365
366 /*
367 * Wait for readers to observe new parity or be quiescent.
368 */
369 wait_for_readers(&cur_snap_readers, NULL, &qsreaders);
370
371 /*
372 * Put quiescent reader list back into registry.
373 */
374 cds_list_splice(&qsreaders, &registry);
375 out:
376 mutex_unlock(&rcu_gp_lock);
377
378 /* Wake all waiters in our stack head, excluding ourself. */
379 cds_wfs_for_each_blocking_safe(gp_waiters_head, waiters_iter,
380 waiters_iter_n) {
381 struct gp_waiters_thread *wt;
382
383 wt = caa_container_of(waiters_iter,
384 struct gp_waiters_thread, node);
385 if (wt == &gp_waiters_thread)
386 continue;
387 urcu_adaptative_wake_up(&wt->wait_futex);
388 }
389
390 gp_end:
391 /*
392 * Finish waiting for reader threads before letting the old ptr being
393 * freed.
394 */
395 if (was_online)
396 rcu_thread_online();
397 else
398 cmm_smp_mb();
399 }
400 #else /* !(CAA_BITS_PER_LONG < 64) */
401 void synchronize_rcu(void)
402 {
403 CDS_LIST_HEAD(qsreaders);
404 unsigned long was_online;
405 struct gp_waiters_thread gp_waiters_thread;
406 struct cds_wfs_head *gp_waiters_head;
407 struct cds_wfs_node *waiters_iter, *waiters_iter_n;
408
409 was_online = URCU_TLS(rcu_reader).ctr;
410
411 /*
412 * Mark the writer thread offline to make sure we don't wait for
413 * our own quiescent state. This allows using synchronize_rcu()
414 * in threads registered as readers.
415 */
416 if (was_online)
417 rcu_thread_offline();
418 else
419 cmm_smp_mb();
420
421 /*
422 * Add ourself to gp_waiters stack of threads awaiting to wait
423 * for a grace period. Proceed to perform the grace period only
424 * if we are the first thread added into the stack.
425 */
426 cds_wfs_node_init(&gp_waiters_thread.node);
427 gp_waiters_thread.wait_futex = AWAKE_WAITING;
428 if (cds_wfs_push(&gp_waiters, &gp_waiters_thread.node) != 0) {
429 /* Not first in stack: will be awakened by another thread. */
430 urcu_adaptative_busy_wait(&gp_waiters_thread.wait_futex);
431 goto gp_end;
432 }
433
434 mutex_lock(&rcu_gp_lock);
435
436 /*
437 * Pop all waiters into our local stack head.
438 */
439 gp_waiters_head = __cds_wfs_pop_all(&gp_waiters);
440
441 if (cds_list_empty(&registry))
442 goto out;
443
444 /* Increment current G.P. */
445 CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
446
447 /*
448 * Must commit rcu_gp_ctr update to memory before waiting for
449 * quiescent state. Failure to do so could result in the writer
450 * waiting forever while new readers are always accessing data
451 * (no progress). Enforce compiler-order of store to rcu_gp_ctr
452 * before load URCU_TLS(rcu_reader).ctr.
453 */
454 cmm_barrier();
455
456 /*
457 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
458 * model easier to understand. It does not have a big performance impact
459 * anyway, given this is the write-side.
460 */
461 cmm_smp_mb();
462
463 /*
464 * Wait for readers to observe new count of be quiescent.
465 */
466 wait_for_readers(&registry, NULL, &qsreaders);
467
468 /*
469 * Put quiescent reader list back into registry.
470 */
471 cds_list_splice(&qsreaders, &registry);
472 out:
473 mutex_unlock(&rcu_gp_lock);
474
475 /* Wake all waiters in our stack head, excluding ourself. */
476 cds_wfs_for_each_blocking_safe(gp_waiters_head, waiters_iter,
477 waiters_iter_n) {
478 struct gp_waiters_thread *wt;
479
480 wt = caa_container_of(waiters_iter,
481 struct gp_waiters_thread, node);
482 if (wt == &gp_waiters_thread)
483 continue;
484 urcu_adaptative_wake_up(&wt->wait_futex);
485 }
486
487 gp_end:
488 if (was_online)
489 rcu_thread_online();
490 else
491 cmm_smp_mb();
492 }
493 #endif /* !(CAA_BITS_PER_LONG < 64) */
494
495 /*
496 * library wrappers to be used by non-LGPL compatible source code.
497 */
498
499 void rcu_read_lock(void)
500 {
501 _rcu_read_lock();
502 }
503
504 void rcu_read_unlock(void)
505 {
506 _rcu_read_unlock();
507 }
508
509 void rcu_quiescent_state(void)
510 {
511 _rcu_quiescent_state();
512 }
513
514 void rcu_thread_offline(void)
515 {
516 _rcu_thread_offline();
517 }
518
519 void rcu_thread_online(void)
520 {
521 _rcu_thread_online();
522 }
523
524 void rcu_register_thread(void)
525 {
526 URCU_TLS(rcu_reader).tid = pthread_self();
527 assert(URCU_TLS(rcu_reader).ctr == 0);
528
529 mutex_lock(&rcu_gp_lock);
530 cds_list_add(&URCU_TLS(rcu_reader).node, &registry);
531 mutex_unlock(&rcu_gp_lock);
532 _rcu_thread_online();
533 }
534
535 void rcu_unregister_thread(void)
536 {
537 /*
538 * We have to make the thread offline otherwise we end up dealocking
539 * with a waiting writer.
540 */
541 _rcu_thread_offline();
542 mutex_lock(&rcu_gp_lock);
543 cds_list_del(&URCU_TLS(rcu_reader).node);
544 mutex_unlock(&rcu_gp_lock);
545 }
546
547 void rcu_exit(void)
548 {
549 /*
550 * Assertion disabled because call_rcu threads are now rcu
551 * readers, and left running at exit.
552 * assert(cds_list_empty(&registry));
553 */
554 }
555
556 DEFINE_RCU_FLAVOR(rcu_flavor);
557
558 #include "urcu-call-rcu-impl.h"
559 #include "urcu-defer-impl.h"
This page took 0.03936 seconds and 5 git commands to generate.