X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=urcu.c;h=a78c02bdca6a71f9e7f66c76295fc8f1f33cdd37;hb=15302e2854ad9f1377ec81e331c1bec7a54a5621;hp=2d5c510aa6744418c5557d642b451e0afae415cd;hpb=c94886840694378921968b490ed1e09d1af3afb3;p=urcu.git diff --git a/urcu.c b/urcu.c index 2d5c510..a78c02b 100644 --- a/urcu.c +++ b/urcu.c @@ -43,6 +43,7 @@ #include "urcu/tls-compat.h" #include "urcu-die.h" +#include "urcu-wait.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #undef _LGPL_SOURCE @@ -106,6 +107,12 @@ DEFINE_URCU_TLS(unsigned int, rcu_rand_yield); static CDS_LIST_HEAD(registry); +/* + * Queue keeping threads awaiting to wait for a grace period. Contains + * struct gp_waiters_thread objects. + */ +static DEFINE_URCU_WAIT_QUEUE(gp_waiters); + static void mutex_lock(pthread_mutex_t *mutex) { int ret; @@ -215,9 +222,10 @@ static void wait_gp(void) NULL, NULL, 0); } -static void wait_for_readers(void) +static void wait_for_readers(struct cds_list_head *input_readers, + struct cds_list_head *cur_snap_readers, + struct cds_list_head *qsreaders) { - CDS_LIST_HEAD(qsreaders); int wait_loops = 0; struct rcu_reader *index, *tmp; @@ -234,13 +242,31 @@ static void wait_for_readers(void) smp_mb_master(RCU_MB_GROUP); } - cds_list_for_each_entry_safe(index, tmp, ®istry, node) { - if (!rcu_gp_ongoing(&index->ctr)) - cds_list_move(&index->node, &qsreaders); + cds_list_for_each_entry_safe(index, tmp, input_readers, node) { + switch (rcu_reader_state(&index->ctr)) { + case RCU_READER_ACTIVE_CURRENT: + if (cur_snap_readers) { + cds_list_move(&index->node, + cur_snap_readers); + break; + } + /* Fall-through */ + case RCU_READER_INACTIVE: + cds_list_move(&index->node, qsreaders); + break; + case RCU_READER_ACTIVE_OLD: + /* + * Old snapshot. Leaving node in + * input_readers will make us busy-loop + * until the snapshot becomes current or + * the reader becomes inactive. + */ + break; + } } #ifndef HAS_INCOHERENT_CACHES - if (cds_list_empty(®istry)) { + if (cds_list_empty(input_readers)) { if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { /* Read reader_gp before write futex */ smp_mb_master(RCU_MB_GROUP); @@ -259,7 +285,7 @@ static void wait_for_readers(void) * URCU_TLS(rcu_reader).ctr update to memory if we wait * for too long. */ - if (cds_list_empty(®istry)) { + if (cds_list_empty(input_readers)) { if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { /* Read reader_gp before write futex */ smp_mb_master(RCU_MB_GROUP); @@ -281,14 +307,40 @@ static void wait_for_readers(void) } #endif /* #else #ifndef HAS_INCOHERENT_CACHES */ } - /* put back the reader list in the registry */ - cds_list_splice(&qsreaders, ®istry); } void synchronize_rcu(void) { + CDS_LIST_HEAD(cur_snap_readers); + CDS_LIST_HEAD(qsreaders); + DEFINE_URCU_WAIT_NODE(wait, URCU_WAIT_WAITING); + struct urcu_waiters waiters; + + /* + * Add ourself to gp_waiters queue of threads awaiting to wait + * for a grace period. Proceed to perform the grace period only + * if we are the first thread added into the queue. + * The implicit memory barrier before urcu_wait_add() + * orders prior memory accesses of threads put into the wait + * queue before their insertion into the wait queue. + */ + if (urcu_wait_add(&gp_waiters, &wait) != 0) { + /* Not first in queue: will be awakened by another thread. */ + urcu_adaptative_busy_wait(&wait); + /* Order following memory accesses after grace period. */ + cmm_smp_mb(); + return; + } + /* We won't need to wake ourself up */ + urcu_wait_set_state(&wait, URCU_WAIT_RUNNING); + mutex_lock(&rcu_gp_lock); + /* + * Move all waiters into our local queue. + */ + urcu_move_waiters(&waiters, &gp_waiters); + if (cds_list_empty(®istry)) goto out; @@ -301,7 +353,7 @@ void synchronize_rcu(void) /* * Wait for readers to observe original parity or be quiescent. */ - wait_for_readers(); + wait_for_readers(®istry, &cur_snap_readers, &qsreaders); /* * Must finish waiting for quiescent state for original parity before @@ -341,7 +393,12 @@ void synchronize_rcu(void) /* * Wait for readers to observe new parity or be quiescent. */ - wait_for_readers(); + wait_for_readers(&cur_snap_readers, NULL, &qsreaders); + + /* + * Put quiescent reader list back into registry. + */ + cds_list_splice(&qsreaders, ®istry); /* Finish waiting for reader threads before letting the old ptr being * freed. Must be done within rcu_gp_lock because it iterates on reader @@ -349,6 +406,13 @@ void synchronize_rcu(void) smp_mb_master(RCU_MB_GROUP); out: mutex_unlock(&rcu_gp_lock); + + /* + * Wakeup waiters only after we have completed the grace period + * and have ensured the memory barriers at the end of the grace + * period have been issued. + */ + urcu_wake_all_waiters(&waiters); } /* @@ -365,6 +429,11 @@ void rcu_read_unlock(void) _rcu_read_unlock(); } +int rcu_read_ongoing(void) +{ + return _rcu_read_ongoing(); +} + void rcu_register_thread(void) { URCU_TLS(rcu_reader).tid = pthread_self();