uatomic/x86: Remove redundant memory barriers
[urcu.git] / src / urcu-qsbr.c
1 // SPDX-FileCopyrightText: 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
2 // SPDX-FileCopyrightText: 2009 Paul E. McKenney, IBM Corporation.
3 //
4 // SPDX-License-Identifier: LGPL-2.1-or-later
5
6 /*
7 * Userspace RCU QSBR library
8 *
9 * IBM's contributions to this file may be relicensed under LGPLv2 or later.
10 */
11
12 #define URCU_NO_COMPAT_IDENTIFIERS
13 #define _LGPL_SOURCE
14 #include <stdio.h>
15 #include <pthread.h>
16 #include <signal.h>
17 #include <stdlib.h>
18 #include <stdint.h>
19 #include <string.h>
20 #include <errno.h>
21 #include <poll.h>
22
23 #include <urcu/annotate.h>
24 #include <urcu/assert.h>
25 #include <urcu/wfcqueue.h>
26 #include <urcu/map/urcu-qsbr.h>
27 #define BUILD_QSBR_LIB
28 #include <urcu/static/urcu-qsbr.h>
29 #include <urcu/pointer.h>
30 #include <urcu/tls-compat.h>
31
32 #include "urcu-die.h"
33 #include "urcu-wait.h"
34 #include "urcu-utils.h"
35
36 #define URCU_API_MAP
37 /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
38 #undef _LGPL_SOURCE
39 #include <urcu/urcu-qsbr.h>
40 #define _LGPL_SOURCE
41
42 void __attribute__((destructor)) urcu_qsbr_exit(void);
43 static void urcu_call_rcu_exit(void);
44
45 /*
46 * rcu_gp_lock ensures mutual exclusion between threads calling
47 * synchronize_rcu().
48 */
49 static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER;
50 /*
51 * rcu_registry_lock ensures mutual exclusion between threads
52 * registering and unregistering themselves to/from the registry, and
53 * with threads reading that registry from synchronize_rcu(). However,
54 * this lock is not held all the way through the completion of awaiting
55 * for the grace period. It is sporadically released between iterations
56 * on the registry.
57 * rcu_registry_lock may nest inside rcu_gp_lock.
58 */
59 static pthread_mutex_t rcu_registry_lock = PTHREAD_MUTEX_INITIALIZER;
60 struct urcu_gp urcu_qsbr_gp = { .ctr = URCU_QSBR_GP_ONLINE };
61
62 /*
63 * Active attempts to check for reader Q.S. before calling futex().
64 */
65 #define RCU_QS_ACTIVE_ATTEMPTS 100
66
67 /*
68 * Written to only by each individual reader. Read by both the reader and the
69 * writers.
70 */
71 DEFINE_URCU_TLS(struct urcu_qsbr_reader, urcu_qsbr_reader);
72
73 static CDS_LIST_HEAD(registry);
74
75 /*
76 * Queue keeping threads awaiting to wait for a grace period. Contains
77 * struct gp_waiters_thread objects.
78 */
79 static DEFINE_URCU_WAIT_QUEUE(gp_waiters);
80
81 static void mutex_lock(pthread_mutex_t *mutex)
82 {
83 int ret;
84
85 #ifndef DISTRUST_SIGNALS_EXTREME
86 ret = pthread_mutex_lock(mutex);
87 if (ret)
88 urcu_die(ret);
89 #else /* #ifndef DISTRUST_SIGNALS_EXTREME */
90 while ((ret = pthread_mutex_trylock(mutex)) != 0) {
91 if (ret != EBUSY && ret != EINTR)
92 urcu_die(ret);
93 poll(NULL,0,10);
94 }
95 #endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
96 }
97
98 static void mutex_unlock(pthread_mutex_t *mutex)
99 {
100 int ret;
101
102 ret = pthread_mutex_unlock(mutex);
103 if (ret)
104 urcu_die(ret);
105 }
106
107 /*
108 * synchronize_rcu() waiting. Single thread.
109 */
110 static void wait_gp(void)
111 {
112 /* Read reader_gp before read futex */
113 cmm_smp_rmb();
114 while (uatomic_read(&urcu_qsbr_gp.futex) == -1) {
115 if (!futex_noasync(&urcu_qsbr_gp.futex, FUTEX_WAIT, -1, NULL, NULL, 0)) {
116 /*
117 * Prior queued wakeups queued by unrelated code
118 * using the same address can cause futex wait to
119 * return 0 even through the futex value is still
120 * -1 (spurious wakeups). Check the value again
121 * in user-space to validate whether it really
122 * differs from -1.
123 */
124 continue;
125 }
126 switch (errno) {
127 case EAGAIN:
128 /* Value already changed. */
129 return;
130 case EINTR:
131 /* Retry if interrupted by signal. */
132 break; /* Get out of switch. Check again. */
133 default:
134 /* Unexpected error. */
135 urcu_die(errno);
136 }
137 }
138 }
139
140 /*
141 * Always called with rcu_registry lock held. Releases this lock between
142 * iterations and grabs it again. Holds the lock when it returns.
143 */
144 static void wait_for_readers(struct cds_list_head *input_readers,
145 struct cds_list_head *cur_snap_readers,
146 struct cds_list_head *qsreaders,
147 cmm_annotate_t *group)
148 {
149 unsigned int wait_loops = 0;
150 struct urcu_qsbr_reader *index, *tmp;
151
152 /*
153 * Wait for each thread URCU_TLS(urcu_qsbr_reader).ctr to either
154 * indicate quiescence (offline), or for them to observe the
155 * current urcu_qsbr_gp.ctr value.
156 */
157 for (;;) {
158 if (wait_loops < RCU_QS_ACTIVE_ATTEMPTS)
159 wait_loops++;
160 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
161 uatomic_set(&urcu_qsbr_gp.futex, -1);
162 /*
163 * Write futex before write waiting (the other side
164 * reads them in the opposite order).
165 */
166 cmm_smp_wmb();
167 cds_list_for_each_entry(index, input_readers, node) {
168 _CMM_STORE_SHARED(index->waiting, 1);
169 }
170 /* Write futex before read reader_gp */
171 cmm_smp_mb();
172 }
173 cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
174 switch (urcu_qsbr_reader_state(&index->ctr, group)) {
175 case URCU_READER_ACTIVE_CURRENT:
176 if (cur_snap_readers) {
177 cds_list_move(&index->node,
178 cur_snap_readers);
179 break;
180 }
181 /* Fall-through */
182 case URCU_READER_INACTIVE:
183 cds_list_move(&index->node, qsreaders);
184 break;
185 case URCU_READER_ACTIVE_OLD:
186 /*
187 * Old snapshot. Leaving node in
188 * input_readers will make us busy-loop
189 * until the snapshot becomes current or
190 * the reader becomes inactive.
191 */
192 break;
193 }
194 }
195
196 if (cds_list_empty(input_readers)) {
197 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
198 /* Read reader_gp before write futex */
199 uatomic_store(&urcu_qsbr_gp.futex, 0, CMM_RELEASE);
200 }
201 break;
202 } else {
203 /* Temporarily unlock the registry lock. */
204 mutex_unlock(&rcu_registry_lock);
205 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
206 wait_gp();
207 } else {
208 #ifndef HAS_INCOHERENT_CACHES
209 caa_cpu_relax();
210 #else /* #ifndef HAS_INCOHERENT_CACHES */
211 cmm_smp_mb();
212 #endif /* #else #ifndef HAS_INCOHERENT_CACHES */
213 }
214 /* Re-lock the registry lock before the next loop. */
215 mutex_lock(&rcu_registry_lock);
216 }
217 }
218 }
219
220 /*
221 * Using a two-subphases algorithm for architectures with smaller than 64-bit
222 * long-size to ensure we do not encounter an overflow bug.
223 */
224
225 #if (CAA_BITS_PER_LONG < 64)
226 void urcu_qsbr_synchronize_rcu(void)
227 {
228 cmm_annotate_define(acquire_group);
229 cmm_annotate_define(release_group);
230 CDS_LIST_HEAD(cur_snap_readers);
231 CDS_LIST_HEAD(qsreaders);
232 unsigned long was_online;
233 DEFINE_URCU_WAIT_NODE(wait, URCU_WAIT_WAITING);
234 struct urcu_waiters waiters;
235
236 was_online = urcu_qsbr_read_ongoing();
237
238 /* All threads should read qparity before accessing data structure
239 * where new ptr points to. In the "then" case, rcu_thread_offline
240 * includes a memory barrier.
241 *
242 * Mark the writer thread offline to make sure we don't wait for
243 * our own quiescent state. This allows using synchronize_rcu()
244 * in threads registered as readers.
245 */
246 if (was_online)
247 urcu_qsbr_thread_offline();
248 else
249 cmm_smp_mb();
250 cmm_annotate_group_mb_release(&release_group);
251
252 /*
253 * Add ourself to gp_waiters queue of threads awaiting to wait
254 * for a grace period. Proceed to perform the grace period only
255 * if we are the first thread added into the queue.
256 */
257 if (urcu_wait_add(&gp_waiters, &wait) != 0) {
258 /* Not first in queue: will be awakened by another thread. */
259 urcu_adaptative_busy_wait(&wait);
260 goto gp_end;
261 }
262 /* We won't need to wake ourself up */
263 urcu_wait_set_state(&wait, URCU_WAIT_RUNNING);
264
265 mutex_lock(&rcu_gp_lock);
266
267 /*
268 * Move all waiters into our local queue.
269 */
270 urcu_move_waiters(&waiters, &gp_waiters);
271
272 mutex_lock(&rcu_registry_lock);
273
274 if (cds_list_empty(&registry))
275 goto out;
276
277 /*
278 * Wait for readers to observe original parity or be quiescent.
279 * wait_for_readers() can release and grab again rcu_registry_lock
280 * internally.
281 */
282 wait_for_readers(&registry, &cur_snap_readers, &qsreaders, &acquire_group);
283
284 /*
285 * Must finish waiting for quiescent state for original parity
286 * before committing next urcu_qsbr_gp.ctr update to memory. Failure
287 * to do so could result in the writer waiting forever while new
288 * readers are always accessing data (no progress). Enforce
289 * compiler-order of load URCU_TLS(urcu_qsbr_reader).ctr before store
290 * to urcu_qsbr_gp.ctr.
291 */
292 cmm_barrier();
293
294 /*
295 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
296 * model easier to understand. It does not have a big performance impact
297 * anyway, given this is the write-side.
298 */
299 cmm_smp_mb();
300
301 /* Switch parity: 0 -> 1, 1 -> 0 */
302 cmm_annotate_group_mem_release(&release_group, &urcu_qsbr_gp.ctr);
303 uatomic_store(&urcu_qsbr_gp.ctr, urcu_qsbr_gp.ctr ^ URCU_QSBR_GP_CTR, CMM_RELAXED);
304
305 /*
306 * Must commit urcu_qsbr_gp.ctr update to memory before waiting for
307 * quiescent state. Failure to do so could result in the writer
308 * waiting forever while new readers are always accessing data
309 * (no progress). Enforce compiler-order of store to urcu_qsbr_gp.ctr
310 * before load URCU_TLS(urcu_qsbr_reader).ctr.
311 */
312 cmm_barrier();
313
314 /*
315 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
316 * model easier to understand. It does not have a big performance impact
317 * anyway, given this is the write-side.
318 */
319 cmm_smp_mb();
320
321 /*
322 * Wait for readers to observe new parity or be quiescent.
323 * wait_for_readers() can release and grab again rcu_registry_lock
324 * internally.
325 */
326 wait_for_readers(&cur_snap_readers, NULL, &qsreaders, &acquire_group);
327
328 /*
329 * Put quiescent reader list back into registry.
330 */
331 cds_list_splice(&qsreaders, &registry);
332 out:
333 mutex_unlock(&rcu_registry_lock);
334 mutex_unlock(&rcu_gp_lock);
335 urcu_wake_all_waiters(&waiters);
336 gp_end:
337 /*
338 * Finish waiting for reader threads before letting the old ptr being
339 * freed.
340 */
341 cmm_annotate_group_mb_acquire(&acquire_group);
342
343 if (was_online)
344 urcu_qsbr_thread_online();
345 else
346 cmm_smp_mb();
347 }
348 #else /* !(CAA_BITS_PER_LONG < 64) */
349 void urcu_qsbr_synchronize_rcu(void)
350 {
351 cmm_annotate_define(acquire_group);
352 cmm_annotate_define(release_group);
353 CDS_LIST_HEAD(qsreaders);
354 unsigned long was_online;
355 DEFINE_URCU_WAIT_NODE(wait, URCU_WAIT_WAITING);
356 struct urcu_waiters waiters;
357
358 was_online = urcu_qsbr_read_ongoing();
359
360 /*
361 * Mark the writer thread offline to make sure we don't wait for
362 * our own quiescent state. This allows using synchronize_rcu()
363 * in threads registered as readers.
364 */
365 if (was_online)
366 urcu_qsbr_thread_offline();
367 else
368 cmm_smp_mb();
369 cmm_annotate_group_mb_release(&release_group);
370
371 /*
372 * Add ourself to gp_waiters queue of threads awaiting to wait
373 * for a grace period. Proceed to perform the grace period only
374 * if we are the first thread added into the queue.
375 */
376 if (urcu_wait_add(&gp_waiters, &wait) != 0) {
377 /* Not first in queue: will be awakened by another thread. */
378 urcu_adaptative_busy_wait(&wait);
379 goto gp_end;
380 }
381 /* We won't need to wake ourself up */
382 urcu_wait_set_state(&wait, URCU_WAIT_RUNNING);
383
384 mutex_lock(&rcu_gp_lock);
385
386 /*
387 * Move all waiters into our local queue.
388 */
389 urcu_move_waiters(&waiters, &gp_waiters);
390
391 mutex_lock(&rcu_registry_lock);
392
393 if (cds_list_empty(&registry))
394 goto out;
395
396 /* Increment current G.P. */
397 cmm_annotate_group_mem_release(&release_group, &urcu_qsbr_gp.ctr);
398 uatomic_store(&urcu_qsbr_gp.ctr, urcu_qsbr_gp.ctr + URCU_QSBR_GP_CTR, CMM_RELAXED);
399
400 /*
401 * Must commit urcu_qsbr_gp.ctr update to memory before waiting for
402 * quiescent state. Failure to do so could result in the writer
403 * waiting forever while new readers are always accessing data
404 * (no progress). Enforce compiler-order of store to urcu_qsbr_gp.ctr
405 * before load URCU_TLS(urcu_qsbr_reader).ctr.
406 */
407 cmm_barrier();
408
409 /*
410 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
411 * model easier to understand. It does not have a big performance impact
412 * anyway, given this is the write-side.
413 */
414 cmm_smp_mb();
415
416 /*
417 * Wait for readers to observe new count of be quiescent.
418 * wait_for_readers() can release and grab again rcu_registry_lock
419 * internally.
420 */
421 wait_for_readers(&registry, NULL, &qsreaders, &acquire_group);
422
423 /*
424 * Put quiescent reader list back into registry.
425 */
426 cds_list_splice(&qsreaders, &registry);
427 out:
428 mutex_unlock(&rcu_registry_lock);
429 mutex_unlock(&rcu_gp_lock);
430 urcu_wake_all_waiters(&waiters);
431 gp_end:
432 if (was_online)
433 urcu_qsbr_thread_online();
434 else
435 cmm_smp_mb();
436
437 cmm_annotate_group_mb_acquire(&acquire_group);
438 }
439 #endif /* !(CAA_BITS_PER_LONG < 64) */
440
441 /*
442 * library wrappers to be used by non-LGPL compatible source code.
443 */
444
445 void urcu_qsbr_read_lock(void)
446 {
447 _urcu_qsbr_read_lock();
448 }
449
450 void urcu_qsbr_read_unlock(void)
451 {
452 _urcu_qsbr_read_unlock();
453 }
454
455 int urcu_qsbr_read_ongoing(void)
456 {
457 return _urcu_qsbr_read_ongoing();
458 }
459 void rcu_read_ongoing_qsbr();
460
461 void urcu_qsbr_quiescent_state(void)
462 {
463 _urcu_qsbr_quiescent_state();
464 }
465 void rcu_quiescent_state_qsbr();
466
467 void urcu_qsbr_thread_offline(void)
468 {
469 _urcu_qsbr_thread_offline();
470 }
471 void rcu_thread_offline_qsbr();
472
473 void urcu_qsbr_thread_online(void)
474 {
475 _urcu_qsbr_thread_online();
476 }
477
478 void urcu_qsbr_register_thread(void)
479 {
480 URCU_TLS(urcu_qsbr_reader).tid = pthread_self();
481 urcu_posix_assert(URCU_TLS(urcu_qsbr_reader).ctr == 0);
482
483 mutex_lock(&rcu_registry_lock);
484 urcu_posix_assert(!URCU_TLS(urcu_qsbr_reader).registered);
485 URCU_TLS(urcu_qsbr_reader).registered = 1;
486 cds_list_add(&URCU_TLS(urcu_qsbr_reader).node, &registry);
487 mutex_unlock(&rcu_registry_lock);
488 _urcu_qsbr_thread_online();
489 }
490
491 void urcu_qsbr_unregister_thread(void)
492 {
493 /*
494 * We have to make the thread offline otherwise we end up dealocking
495 * with a waiting writer.
496 */
497 _urcu_qsbr_thread_offline();
498 urcu_posix_assert(URCU_TLS(urcu_qsbr_reader).registered);
499 URCU_TLS(urcu_qsbr_reader).registered = 0;
500 mutex_lock(&rcu_registry_lock);
501 cds_list_del(&URCU_TLS(urcu_qsbr_reader).node);
502 mutex_unlock(&rcu_registry_lock);
503 }
504
505 void urcu_qsbr_exit(void)
506 {
507 /*
508 * Assertion disabled because call_rcu threads are now rcu
509 * readers, and left running at exit.
510 * urcu_posix_assert(cds_list_empty(&registry));
511 */
512 urcu_call_rcu_exit();
513 }
514
515 DEFINE_RCU_FLAVOR(rcu_flavor);
516
517 #include "urcu-call-rcu-impl.h"
518 #include "urcu-defer-impl.h"
519 #include "urcu-poll-impl.h"
This page took 0.039042 seconds and 5 git commands to generate.