X-Git-Url: https://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu-qsbr.c;h=d6adc5bfaafa9d4d851842bad7d398967b081eec;hp=7f747ed5f9ce8ef133df3c2d026b8b76154e2e9e;hb=92af1a30ca6a70945b167c31631c8598a626c71a;hpb=6362f68f024fd85fefe342b1f82d8787146c1ebb diff --git a/urcu-qsbr.c b/urcu-qsbr.c index 7f747ed..d6adc5b 100644 --- a/urcu-qsbr.c +++ b/urcu-qsbr.c @@ -36,7 +36,6 @@ #include #include "urcu/wfcqueue.h" -#include "urcu/wfstack.h" #include "urcu/map/urcu-qsbr.h" #define BUILD_QSBR_LIB #include "urcu/static/urcu-qsbr.h" @@ -44,6 +43,7 @@ #include "urcu/tls-compat.h" #include "urcu-die.h" +#include "urcu-wait.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #undef _LGPL_SOURCE @@ -53,13 +53,7 @@ void __attribute__((destructor)) rcu_exit(void); static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER; - -int32_t rcu_gp_futex; - -/* - * Global grace period counter. - */ -unsigned long rcu_gp_ctr = RCU_GP_ONLINE; +struct rcu_gp rcu_gp = { .ctr = RCU_GP_ONLINE }; /* * Active attempts to check for reader Q.S. before calling futex(). @@ -80,33 +74,10 @@ DEFINE_URCU_TLS(unsigned int, rcu_rand_yield); static CDS_LIST_HEAD(registry); /* - * Number of busy-loop attempts before waiting on futex for grace period - * batching. - */ -#define RCU_AWAKE_ATTEMPTS 1000 - -enum adapt_wakeup_state { - /* AWAKE_WAITING is compared directly (futex compares it). */ - AWAKE_WAITING = 0, - /* non-zero are used as masks. */ - AWAKE_WAKEUP = (1 << 0), - AWAKE_AWAKENED = (1 << 1), - AWAKE_TEARDOWN = (1 << 2), -}; - -struct gp_waiters_thread { - struct cds_wfs_node node; - int32_t wait_futex; -}; - -/* - * Stack keeping threads awaiting to wait for a grace period. Contains + * Queue keeping threads awaiting to wait for a grace period. Contains * struct gp_waiters_thread objects. */ -static struct cds_wfs_stack gp_waiters = { - .head = CDS_WFS_END, - .lock = PTHREAD_MUTEX_INITIALIZER, -}; +static DEFINE_URCU_WAIT_QUEUE(gp_waiters); static void mutex_lock(pthread_mutex_t *mutex) { @@ -141,63 +112,11 @@ static void wait_gp(void) { /* Read reader_gp before read futex */ cmm_smp_rmb(); - if (uatomic_read(&rcu_gp_futex) == -1) - futex_noasync(&rcu_gp_futex, FUTEX_WAIT, -1, + if (uatomic_read(&rcu_gp.futex) == -1) + futex_noasync(&rcu_gp.futex, FUTEX_WAIT, -1, NULL, NULL, 0); } -/* - * Note: urcu_adaptative_wake_up needs "value" to stay allocated - * throughout its execution. In this scheme, the waiter owns the futex - * memory, and we only allow it to free this memory when it receives the - * AWAKE_TEARDOWN flag. - */ -static void urcu_adaptative_wake_up(int32_t *value) -{ - cmm_smp_mb(); - assert(uatomic_read(value) == AWAKE_WAITING); - uatomic_set(value, AWAKE_WAKEUP); - if (!(uatomic_read(value) & AWAKE_AWAKENED)) - futex_noasync(value, FUTEX_WAKE, 1, NULL, NULL, 0); - /* Allow teardown of "value" memory. */ - uatomic_or(value, AWAKE_TEARDOWN); -} - -/* - * Caller must initialize "value" to AWAKE_WAITING before passing its - * memory to waker thread. - */ -static void urcu_adaptative_busy_wait(int32_t *value) -{ - unsigned int i; - - /* Load and test condition before read futex */ - cmm_smp_rmb(); - for (i = 0; i < RCU_AWAKE_ATTEMPTS; i++) { - if (uatomic_read(value) != AWAKE_WAITING) - goto skip_futex_wait; - caa_cpu_relax(); - } - futex_noasync(value, FUTEX_WAIT, AWAKE_WAITING, NULL, NULL, 0); -skip_futex_wait: - - /* Tell waker thread than we are awakened. */ - uatomic_or(value, AWAKE_AWAKENED); - - /* - * Wait until waker thread lets us know it's ok to tear down - * memory allocated for value. - */ - for (i = 0; i < RCU_AWAKE_ATTEMPTS; i++) { - if (uatomic_read(value) & AWAKE_TEARDOWN) - break; - caa_cpu_relax(); - } - while (!(uatomic_read(value) & AWAKE_TEARDOWN)) - poll(NULL, 0, 10); - assert(uatomic_read(value) & AWAKE_TEARDOWN); -} - static void wait_for_readers(struct cds_list_head *input_readers, struct cds_list_head *cur_snap_readers, struct cds_list_head *qsreaders) @@ -208,12 +127,12 @@ static void wait_for_readers(struct cds_list_head *input_readers, /* * Wait for each thread URCU_TLS(rcu_reader).ctr to either * indicate quiescence (offline), or for them to observe the - * current rcu_gp_ctr value. + * current rcu_gp.ctr value. */ for (;;) { wait_loops++; if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) { - uatomic_set(&rcu_gp_futex, -1); + uatomic_set(&rcu_gp.futex, -1); /* * Write futex before write waiting (the other side * reads them in the opposite order). @@ -252,7 +171,7 @@ static void wait_for_readers(struct cds_list_head *input_readers, if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) { /* Read reader_gp before write futex */ cmm_smp_mb(); - uatomic_set(&rcu_gp_futex, 0); + uatomic_set(&rcu_gp.futex, 0); } break; } else { @@ -280,11 +199,10 @@ void synchronize_rcu(void) CDS_LIST_HEAD(cur_snap_readers); CDS_LIST_HEAD(qsreaders); unsigned long was_online; - struct gp_waiters_thread gp_waiters_thread; - struct cds_wfs_head *gp_waiters_head; - struct cds_wfs_node *waiters_iter, *waiters_iter_n; + DEFINE_URCU_WAIT_NODE(wait, URCU_WAIT_WAITING); + struct urcu_waiters waiters; - was_online = URCU_TLS(rcu_reader).ctr; + was_online = rcu_read_ongoing(); /* All threads should read qparity before accessing data structure * where new ptr points to. In the "then" case, rcu_thread_offline @@ -300,24 +218,24 @@ void synchronize_rcu(void) cmm_smp_mb(); /* - * Add ourself to gp_waiters stack of threads awaiting to wait + * Add ourself to gp_waiters queue of threads awaiting to wait * for a grace period. Proceed to perform the grace period only - * if we are the first thread added into the stack. + * if we are the first thread added into the queue. */ - cds_wfs_node_init(&gp_waiters_thread.node); - gp_waiters_thread.wait_futex = AWAKE_WAITING; - if (cds_wfs_push(&gp_waiters, &gp_waiters_node) != 0) { - /* Not first in stack: will be awakened by another thread. */ - urcu_adaptative_busy_wait(&gp_waiters_thread.wait_futex); + if (urcu_wait_add(&gp_waiters, &wait) != 0) { + /* Not first in queue: will be awakened by another thread. */ + urcu_adaptative_busy_wait(&wait); goto gp_end; } + /* We won't need to wake ourself up */ + urcu_wait_set_state(&wait, URCU_WAIT_RUNNING); mutex_lock(&rcu_gp_lock); /* - * Pop all waiters into our local stack head. + * Move all waiters into our local queue. */ - gp_waiters_head = __cds_wfs_pop_all(&gp_waiters); + urcu_move_waiters(&waiters, &gp_waiters); if (cds_list_empty(®istry)) goto out; @@ -329,11 +247,11 @@ void synchronize_rcu(void) /* * Must finish waiting for quiescent state for original parity - * before committing next rcu_gp_ctr update to memory. Failure + * before committing next rcu_gp.ctr update to memory. Failure * to do so could result in the writer waiting forever while new * readers are always accessing data (no progress). Enforce * compiler-order of load URCU_TLS(rcu_reader).ctr before store - * to rcu_gp_ctr. + * to rcu_gp.ctr. */ cmm_barrier(); @@ -345,13 +263,13 @@ void synchronize_rcu(void) cmm_smp_mb(); /* Switch parity: 0 -> 1, 1 -> 0 */ - CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); + CMM_STORE_SHARED(rcu_gp.ctr, rcu_gp.ctr ^ RCU_GP_CTR); /* - * Must commit rcu_gp_ctr update to memory before waiting for + * Must commit rcu_gp.ctr update to memory before waiting for * quiescent state. Failure to do so could result in the writer * waiting forever while new readers are always accessing data - * (no progress). Enforce compiler-order of store to rcu_gp_ctr + * (no progress). Enforce compiler-order of store to rcu_gp.ctr * before load URCU_TLS(rcu_reader).ctr. */ cmm_barrier(); @@ -374,19 +292,7 @@ void synchronize_rcu(void) cds_list_splice(&qsreaders, ®istry); out: mutex_unlock(&rcu_gp_lock); - - /* Wake all waiters in our stack head, excluding ourself. */ - cds_wfs_for_each_blocking_safe(gp_waiters_head, waiters_iter, - waiters_iter_n) { - struct gp_waiters_thread *wt; - - wt = caa_container_of(waiters_iter, - struct gp_waiters_thread, node); - if (wt == &gp_waiters_thread) - continue; - urcu_adaptative_wake_up(&wt->wait_futex); - } - + urcu_wake_all_waiters(&waiters); gp_end: /* * Finish waiting for reader threads before letting the old ptr being @@ -402,11 +308,10 @@ void synchronize_rcu(void) { CDS_LIST_HEAD(qsreaders); unsigned long was_online; - struct gp_waiters_thread gp_waiters_thread; - struct cds_wfs_head *gp_waiters_head; - struct cds_wfs_node *waiters_iter, *waiters_iter_n; + DEFINE_URCU_WAIT_NODE(wait, URCU_WAIT_WAITING); + struct urcu_waiters waiters; - was_online = URCU_TLS(rcu_reader).ctr; + was_online = rcu_read_ongoing(); /* * Mark the writer thread offline to make sure we don't wait for @@ -419,36 +324,36 @@ void synchronize_rcu(void) cmm_smp_mb(); /* - * Add ourself to gp_waiters stack of threads awaiting to wait + * Add ourself to gp_waiters queue of threads awaiting to wait * for a grace period. Proceed to perform the grace period only - * if we are the first thread added into the stack. + * if we are the first thread added into the queue. */ - cds_wfs_node_init(&gp_waiters_thread.node); - gp_waiters_thread.wait_futex = AWAKE_WAITING; - if (cds_wfs_push(&gp_waiters, &gp_waiters_thread.node) != 0) { - /* Not first in stack: will be awakened by another thread. */ - urcu_adaptative_busy_wait(&gp_waiters_thread.wait_futex); + if (urcu_wait_add(&gp_waiters, &wait) != 0) { + /* Not first in queue: will be awakened by another thread. */ + urcu_adaptative_busy_wait(&wait); goto gp_end; } + /* We won't need to wake ourself up */ + urcu_wait_set_state(&wait, URCU_WAIT_RUNNING); mutex_lock(&rcu_gp_lock); /* - * Pop all waiters into our local stack head. + * Move all waiters into our local queue. */ - gp_waiters_head = __cds_wfs_pop_all(&gp_waiters); + urcu_move_waiters(&waiters, &gp_waiters); if (cds_list_empty(®istry)) goto out; /* Increment current G.P. */ - CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR); + CMM_STORE_SHARED(rcu_gp.ctr, rcu_gp.ctr + RCU_GP_CTR); /* - * Must commit rcu_gp_ctr update to memory before waiting for + * Must commit rcu_gp.ctr update to memory before waiting for * quiescent state. Failure to do so could result in the writer * waiting forever while new readers are always accessing data - * (no progress). Enforce compiler-order of store to rcu_gp_ctr + * (no progress). Enforce compiler-order of store to rcu_gp.ctr * before load URCU_TLS(rcu_reader).ctr. */ cmm_barrier(); @@ -471,19 +376,7 @@ void synchronize_rcu(void) cds_list_splice(&qsreaders, ®istry); out: mutex_unlock(&rcu_gp_lock); - - /* Wake all waiters in our stack head, excluding ourself. */ - cds_wfs_for_each_blocking_safe(gp_waiters_head, waiters_iter, - waiters_iter_n) { - struct gp_waiters_thread *wt; - - wt = caa_container_of(waiters_iter, - struct gp_waiters_thread, node); - if (wt == &gp_waiters_thread) - continue; - urcu_adaptative_wake_up(&wt->wait_futex); - } - + urcu_wake_all_waiters(&waiters); gp_end: if (was_online) rcu_thread_online(); @@ -506,6 +399,11 @@ void rcu_read_unlock(void) _rcu_read_unlock(); } +int rcu_read_ongoing(void) +{ + return _rcu_read_ongoing(); +} + void rcu_quiescent_state(void) { _rcu_quiescent_state();