X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu-qsbr.c;h=786a80d6b16b1936dc62b71b43db18850854dd41;hp=5b341b5cfd6d97034b08ca5ea6df913a0c8a7736;hb=882f335739b978d1c55be2faeed077f315afe5d7;hpb=708d89f0b396d20522fe87cc80d91072bb9fcb02 diff --git a/urcu-qsbr.c b/urcu-qsbr.c index 5b341b5..786a80d 100644 --- a/urcu-qsbr.c +++ b/urcu-qsbr.c @@ -43,6 +43,7 @@ #include "urcu/tls-compat.h" #include "urcu-die.h" +#include "urcu-wait.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #undef _LGPL_SOURCE @@ -78,6 +79,12 @@ DEFINE_URCU_TLS(unsigned int, rcu_rand_yield); static CDS_LIST_HEAD(registry); +/* + * Queue keeping threads awaiting to wait for a grace period. Contains + * struct gp_waiters_thread objects. + */ +static DEFINE_URCU_WAIT_QUEUE(gp_waiters); + static void mutex_lock(pthread_mutex_t *mutex) { int ret; @@ -198,8 +205,10 @@ void synchronize_rcu(void) CDS_LIST_HEAD(cur_snap_readers); CDS_LIST_HEAD(qsreaders); unsigned long was_online; + DEFINE_URCU_WAIT_NODE(wait, URCU_WAIT_WAITING); + struct urcu_waiters waiters; - was_online = URCU_TLS(rcu_reader).ctr; + was_online = rcu_read_ongoing(); /* All threads should read qparity before accessing data structure * where new ptr points to. In the "then" case, rcu_thread_offline @@ -214,8 +223,26 @@ void synchronize_rcu(void) else cmm_smp_mb(); + /* + * Add ourself to gp_waiters queue of threads awaiting to wait + * for a grace period. Proceed to perform the grace period only + * if we are the first thread added into the queue. + */ + if (urcu_wait_add(&gp_waiters, &wait) != 0) { + /* Not first in queue: will be awakened by another thread. */ + urcu_adaptative_busy_wait(&wait); + goto gp_end; + } + /* We won't need to wake ourself up */ + urcu_wait_set_state(&wait, URCU_WAIT_RUNNING); + mutex_lock(&rcu_gp_lock); + /* + * Move all waiters into our local queue. + */ + urcu_move_waiters(&waiters, &gp_waiters); + if (cds_list_empty(®istry)) goto out; @@ -271,7 +298,8 @@ void synchronize_rcu(void) cds_list_splice(&qsreaders, ®istry); out: mutex_unlock(&rcu_gp_lock); - + urcu_wake_all_waiters(&waiters); +gp_end: /* * Finish waiting for reader threads before letting the old ptr being * freed. @@ -286,8 +314,10 @@ void synchronize_rcu(void) { CDS_LIST_HEAD(qsreaders); unsigned long was_online; + DEFINE_URCU_WAIT_NODE(wait, URCU_WAIT_WAITING); + struct urcu_waiters waiters; - was_online = URCU_TLS(rcu_reader).ctr; + was_online = rcu_read_ongoing(); /* * Mark the writer thread offline to make sure we don't wait for @@ -299,7 +329,26 @@ void synchronize_rcu(void) else cmm_smp_mb(); + /* + * Add ourself to gp_waiters queue of threads awaiting to wait + * for a grace period. Proceed to perform the grace period only + * if we are the first thread added into the queue. + */ + if (urcu_wait_add(&gp_waiters, &wait) != 0) { + /* Not first in queue: will be awakened by another thread. */ + urcu_adaptative_busy_wait(&wait); + goto gp_end; + } + /* We won't need to wake ourself up */ + urcu_wait_set_state(&wait, URCU_WAIT_RUNNING); + mutex_lock(&rcu_gp_lock); + + /* + * Move all waiters into our local queue. + */ + urcu_move_waiters(&waiters, &gp_waiters); + if (cds_list_empty(®istry)) goto out; @@ -333,7 +382,8 @@ void synchronize_rcu(void) cds_list_splice(&qsreaders, ®istry); out: mutex_unlock(&rcu_gp_lock); - + urcu_wake_all_waiters(&waiters); +gp_end: if (was_online) rcu_thread_online(); else @@ -355,6 +405,11 @@ void rcu_read_unlock(void) _rcu_read_unlock(); } +int rcu_read_ongoing(void) +{ + return _rcu_read_ongoing(); +} + void rcu_quiescent_state(void) { _rcu_quiescent_state();