X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu-qsbr.c;h=d69138974722d881d9345a105f48a6c4870d8f45;hp=eb167d128be77125ce450b1bac808870fb994975;hb=cba82d7bd73c2d3d9772190d51314c59bd2ef309;hpb=f6b42f9c22f265a7c071a8b60c546a76faa5f69b diff --git a/urcu-qsbr.c b/urcu-qsbr.c index eb167d1..d691389 100644 --- a/urcu-qsbr.c +++ b/urcu-qsbr.c @@ -36,6 +36,7 @@ #include #include "urcu/wfcqueue.h" +#include "urcu/wfstack.h" #include "urcu/map/urcu-qsbr.h" #define BUILD_QSBR_LIB #include "urcu/static/urcu-qsbr.h" @@ -43,6 +44,7 @@ #include "urcu/tls-compat.h" #include "urcu-die.h" +#include "urcu-wait.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #undef _LGPL_SOURCE @@ -78,6 +80,20 @@ DEFINE_URCU_TLS(unsigned int, rcu_rand_yield); static CDS_LIST_HEAD(registry); +struct gp_waiters_thread { + struct cds_wfs_node node; + struct urcu_wait wait; +}; + +/* + * Stack keeping threads awaiting to wait for a grace period. Contains + * struct gp_waiters_thread objects. + */ +static struct cds_wfs_stack gp_waiters = { + .head = CDS_WFS_END, + .lock = PTHREAD_MUTEX_INITIALIZER, +}; + static void mutex_lock(pthread_mutex_t *mutex) { int ret; @@ -116,9 +132,10 @@ static void wait_gp(void) NULL, NULL, 0); } -static void wait_for_readers(void) +static void wait_for_readers(struct cds_list_head *input_readers, + struct cds_list_head *cur_snap_readers, + struct cds_list_head *qsreaders) { - CDS_LIST_HEAD(qsreaders); int wait_loops = 0; struct rcu_reader *index, *tmp; @@ -136,18 +153,36 @@ static void wait_for_readers(void) * reads them in the opposite order). */ cmm_smp_wmb(); - cds_list_for_each_entry(index, ®istry, node) { + cds_list_for_each_entry(index, input_readers, node) { _CMM_STORE_SHARED(index->waiting, 1); } /* Write futex before read reader_gp */ cmm_smp_mb(); } - cds_list_for_each_entry_safe(index, tmp, ®istry, node) { - if (!rcu_gp_ongoing(&index->ctr)) - cds_list_move(&index->node, &qsreaders); + cds_list_for_each_entry_safe(index, tmp, input_readers, node) { + switch (rcu_reader_state(&index->ctr)) { + case RCU_READER_ACTIVE_CURRENT: + if (cur_snap_readers) { + cds_list_move(&index->node, + cur_snap_readers); + break; + } + /* Fall-through */ + case RCU_READER_INACTIVE: + cds_list_move(&index->node, qsreaders); + break; + case RCU_READER_ACTIVE_OLD: + /* + * Old snapshot. Leaving node in + * input_readers will make us busy-loop + * until the snapshot becomes current or + * the reader becomes inactive. + */ + break; + } } - if (cds_list_empty(®istry)) { + if (cds_list_empty(input_readers)) { if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) { /* Read reader_gp before write futex */ cmm_smp_mb(); @@ -166,8 +201,6 @@ static void wait_for_readers(void) } } } - /* put back the reader list in the registry */ - cds_list_splice(&qsreaders, ®istry); } /* @@ -178,7 +211,12 @@ static void wait_for_readers(void) #if (CAA_BITS_PER_LONG < 64) void synchronize_rcu(void) { + CDS_LIST_HEAD(cur_snap_readers); + CDS_LIST_HEAD(qsreaders); unsigned long was_online; + struct gp_waiters_thread gp_waiters_thread; + struct cds_wfs_head *gp_waiters_head; + struct cds_wfs_node *waiters_iter, *waiters_iter_n; was_online = URCU_TLS(rcu_reader).ctr; @@ -195,15 +233,33 @@ void synchronize_rcu(void) else cmm_smp_mb(); + /* + * Add ourself to gp_waiters stack of threads awaiting to wait + * for a grace period. Proceed to perform the grace period only + * if we are the first thread added into the stack. + */ + cds_wfs_node_init(&gp_waiters_thread.node); + urcu_wait_init(&gp_waiters_thread.wait); + if (cds_wfs_push(&gp_waiters, &gp_waiters_node) != 0) { + /* Not first in stack: will be awakened by another thread. */ + urcu_adaptative_busy_wait(&gp_waiters_thread.wait); + goto gp_end; + } + mutex_lock(&rcu_gp_lock); + /* + * Pop all waiters into our local stack head. + */ + gp_waiters_head = __cds_wfs_pop_all(&gp_waiters); + if (cds_list_empty(®istry)) goto out; /* * Wait for readers to observe original parity or be quiescent. */ - wait_for_readers(); + wait_for_readers(®istry, &cur_snap_readers, &qsreaders); /* * Must finish waiting for quiescent state for original parity @@ -244,10 +300,28 @@ void synchronize_rcu(void) /* * Wait for readers to observe new parity or be quiescent. */ - wait_for_readers(); + wait_for_readers(&cur_snap_readers, NULL, &qsreaders); + + /* + * Put quiescent reader list back into registry. + */ + cds_list_splice(&qsreaders, ®istry); out: mutex_unlock(&rcu_gp_lock); + /* Wake all waiters in our stack head, excluding ourself. */ + cds_wfs_for_each_blocking_safe(gp_waiters_head, waiters_iter, + waiters_iter_n) { + struct gp_waiters_thread *wt; + + wt = caa_container_of(waiters_iter, + struct gp_waiters_thread, node); + if (wt == &gp_waiters_thread) + continue; + urcu_adaptative_wake_up(&wt->wait); + } + +gp_end: /* * Finish waiting for reader threads before letting the old ptr being * freed. @@ -260,7 +334,11 @@ out: #else /* !(CAA_BITS_PER_LONG < 64) */ void synchronize_rcu(void) { + CDS_LIST_HEAD(qsreaders); unsigned long was_online; + struct gp_waiters_thread gp_waiters_thread; + struct cds_wfs_head *gp_waiters_head; + struct cds_wfs_node *waiters_iter, *waiters_iter_n; was_online = URCU_TLS(rcu_reader).ctr; @@ -274,7 +352,26 @@ void synchronize_rcu(void) else cmm_smp_mb(); + /* + * Add ourself to gp_waiters stack of threads awaiting to wait + * for a grace period. Proceed to perform the grace period only + * if we are the first thread added into the stack. + */ + cds_wfs_node_init(&gp_waiters_thread.node); + urcu_wait_init(&gp_waiters_thread.wait); + if (cds_wfs_push(&gp_waiters, &gp_waiters_thread.node) != 0) { + /* Not first in stack: will be awakened by another thread. */ + urcu_adaptative_busy_wait(&gp_waiters_thread.wait); + goto gp_end; + } + mutex_lock(&rcu_gp_lock); + + /* + * Pop all waiters into our local stack head. + */ + gp_waiters_head = __cds_wfs_pop_all(&gp_waiters); + if (cds_list_empty(®istry)) goto out; @@ -300,10 +397,28 @@ void synchronize_rcu(void) /* * Wait for readers to observe new count of be quiescent. */ - wait_for_readers(); + wait_for_readers(®istry, NULL, &qsreaders); + + /* + * Put quiescent reader list back into registry. + */ + cds_list_splice(&qsreaders, ®istry); out: mutex_unlock(&rcu_gp_lock); + /* Wake all waiters in our stack head, excluding ourself. */ + cds_wfs_for_each_blocking_safe(gp_waiters_head, waiters_iter, + waiters_iter_n) { + struct gp_waiters_thread *wt; + + wt = caa_container_of(waiters_iter, + struct gp_waiters_thread, node); + if (wt == &gp_waiters_thread) + continue; + urcu_adaptative_wake_up(&wt->wait); + } + +gp_end: if (was_online) rcu_thread_online(); else