X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=urcu-qsbr.c;h=b01294accd3fd0cdb3bbe5934b5cff582b4eabf0;hb=3f16f0751bef22e9b0de5110dbbd3db1586433d0;hp=d6adc5bfaafa9d4d851842bad7d398967b081eec;hpb=4de0cd31491bcb93a19c14fc1eb2a2a23ce12855;p=urcu.git diff --git a/urcu-qsbr.c b/urcu-qsbr.c index d6adc5b..b01294a 100644 --- a/urcu-qsbr.c +++ b/urcu-qsbr.c @@ -52,7 +52,21 @@ void __attribute__((destructor)) rcu_exit(void); +/* + * rcu_gp_lock ensures mutual exclusion between threads calling + * synchronize_rcu(). + */ static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER; +/* + * rcu_registry_lock ensures mutual exclusion between threads + * registering and unregistering themselves to/from the registry, and + * with threads reading that registry from synchronize_rcu(). However, + * this lock is not held all the way through the completion of awaiting + * for the grace period. It is sporadically released between iterations + * on the registry. + * rcu_registry_lock may nest inside rcu_gp_lock. + */ +static pthread_mutex_t rcu_registry_lock = PTHREAD_MUTEX_INITIALIZER; struct rcu_gp rcu_gp = { .ctr = RCU_GP_ONLINE }; /* @@ -64,11 +78,11 @@ struct rcu_gp rcu_gp = { .ctr = RCU_GP_ONLINE }; * Written to only by each individual reader. Read by both the reader and the * writers. */ -DEFINE_URCU_TLS(struct rcu_reader, rcu_reader); +__DEFINE_URCU_TLS_GLOBAL(struct rcu_reader, rcu_reader); #ifdef DEBUG_YIELD unsigned int rcu_yield_active; -DEFINE_URCU_TLS(unsigned int, rcu_rand_yield); +__DEFINE_URCU_TLS_GLOBAL(unsigned int, rcu_rand_yield); #endif static CDS_LIST_HEAD(registry); @@ -117,11 +131,15 @@ static void wait_gp(void) NULL, NULL, 0); } +/* + * Always called with rcu_registry lock held. Releases this lock between + * iterations and grabs it again. Holds the lock when it returns. + */ static void wait_for_readers(struct cds_list_head *input_readers, struct cds_list_head *cur_snap_readers, struct cds_list_head *qsreaders) { - int wait_loops = 0; + unsigned int wait_loops = 0; struct rcu_reader *index, *tmp; /* @@ -130,7 +148,8 @@ static void wait_for_readers(struct cds_list_head *input_readers, * current rcu_gp.ctr value. */ for (;;) { - wait_loops++; + if (wait_loops < RCU_QS_ACTIVE_ATTEMPTS) + wait_loops++; if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) { uatomic_set(&rcu_gp.futex, -1); /* @@ -175,6 +194,8 @@ static void wait_for_readers(struct cds_list_head *input_readers, } break; } else { + /* Temporarily unlock the registry lock. */ + mutex_unlock(&rcu_registry_lock); if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) { wait_gp(); } else { @@ -184,6 +205,8 @@ static void wait_for_readers(struct cds_list_head *input_readers, cmm_smp_mb(); #endif /* #else #ifndef HAS_INCOHERENT_CACHES */ } + /* Re-lock the registry lock before the next loop. */ + mutex_lock(&rcu_registry_lock); } } } @@ -237,11 +260,15 @@ void synchronize_rcu(void) */ urcu_move_waiters(&waiters, &gp_waiters); + mutex_lock(&rcu_registry_lock); + if (cds_list_empty(®istry)) goto out; /* * Wait for readers to observe original parity or be quiescent. + * wait_for_readers() can release and grab again rcu_registry_lock + * interally. */ wait_for_readers(®istry, &cur_snap_readers, &qsreaders); @@ -283,6 +310,8 @@ void synchronize_rcu(void) /* * Wait for readers to observe new parity or be quiescent. + * wait_for_readers() can release and grab again rcu_registry_lock + * interally. */ wait_for_readers(&cur_snap_readers, NULL, &qsreaders); @@ -291,6 +320,7 @@ void synchronize_rcu(void) */ cds_list_splice(&qsreaders, ®istry); out: + mutex_unlock(&rcu_registry_lock); mutex_unlock(&rcu_gp_lock); urcu_wake_all_waiters(&waiters); gp_end: @@ -343,6 +373,8 @@ void synchronize_rcu(void) */ urcu_move_waiters(&waiters, &gp_waiters); + mutex_lock(&rcu_registry_lock); + if (cds_list_empty(®istry)) goto out; @@ -367,6 +399,8 @@ void synchronize_rcu(void) /* * Wait for readers to observe new count of be quiescent. + * wait_for_readers() can release and grab again rcu_registry_lock + * interally. */ wait_for_readers(®istry, NULL, &qsreaders); @@ -375,6 +409,7 @@ void synchronize_rcu(void) */ cds_list_splice(&qsreaders, ®istry); out: + mutex_unlock(&rcu_registry_lock); mutex_unlock(&rcu_gp_lock); urcu_wake_all_waiters(&waiters); gp_end: @@ -424,9 +459,9 @@ void rcu_register_thread(void) URCU_TLS(rcu_reader).tid = pthread_self(); assert(URCU_TLS(rcu_reader).ctr == 0); - mutex_lock(&rcu_gp_lock); + mutex_lock(&rcu_registry_lock); cds_list_add(&URCU_TLS(rcu_reader).node, ®istry); - mutex_unlock(&rcu_gp_lock); + mutex_unlock(&rcu_registry_lock); _rcu_thread_online(); } @@ -437,9 +472,9 @@ void rcu_unregister_thread(void) * with a waiting writer. */ _rcu_thread_offline(); - mutex_lock(&rcu_gp_lock); + mutex_lock(&rcu_registry_lock); cds_list_del(&URCU_TLS(rcu_reader).node); - mutex_unlock(&rcu_gp_lock); + mutex_unlock(&rcu_registry_lock); } void rcu_exit(void)