From bc6c15bba37d0e6192c006c4d9815201b36d1988 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Fri, 25 Sep 2009 17:49:31 -0400 Subject: [PATCH] Add futex support to accelerate synchronize_rcu() on UP Signed-off-by: Mathieu Desnoyers --- urcu-qsbr-static.h | 29 ++++++++++++++++++++++++++++ urcu-qsbr.c | 37 ++++++++++++++++++++++++++++++++---- urcu-static.h | 44 ++++++++++++++++++++++++++++++++++++++----- urcu.c | 47 +++++++++++++++++++++++++++++++++++++--------- 4 files changed, 139 insertions(+), 18 deletions(-) diff --git a/urcu-qsbr-static.h b/urcu-qsbr-static.h index c0467cd..87305cb 100644 --- a/urcu-qsbr-static.h +++ b/urcu-qsbr-static.h @@ -33,6 +33,8 @@ #include #include #include +#include +#include #include #include @@ -87,6 +89,10 @@ (_________p1); \ }) +#define futex(...) syscall(__NR_futex, __VA_ARGS__) +#define FUTEX_WAIT 0 +#define FUTEX_WAKE 1 + /* * This code section can only be included in LGPL 2.1 compatible source code. * See below for the function call wrappers which can be used in code meant to @@ -102,6 +108,11 @@ */ #define KICK_READER_LOOPS 10000 +/* + * Active attempts to check for reader Q.S. before calling futex(). + */ +#define RCU_QS_ACTIVE_ATTEMPTS 100 + #ifdef DEBUG_RCU #define rcu_assert(args...) assert(args) #else @@ -173,6 +184,20 @@ extern unsigned long urcu_gp_ctr; extern unsigned long __thread rcu_reader_qs_gp; +extern int gp_futex; + +/* + * Wake-up waiting synchronize_rcu(). Called from many concurrent threads. + */ +static inline void wake_up_gp(void) +{ + if (unlikely(atomic_read(&gp_futex) == -1)) { + atomic_set(&gp_futex, 0); + futex(&gp_futex, FUTEX_WAKE, 1, + NULL, NULL, 0); + } +} + #if (BITS_PER_LONG < 64) static inline int rcu_gp_ongoing(unsigned long *value) { @@ -208,6 +233,8 @@ static inline void _rcu_quiescent_state(void) { smp_mb(); _STORE_SHARED(rcu_reader_qs_gp, _LOAD_SHARED(urcu_gp_ctr)); + smp_mb(); /* write rcu_reader_qs_gp before read futex */ + wake_up_gp(); smp_mb(); } @@ -215,6 +242,8 @@ static inline void _rcu_thread_offline(void) { smp_mb(); STORE_SHARED(rcu_reader_qs_gp, 0); + smp_mb(); /* write rcu_reader_qs_gp before read futex */ + wake_up_gp(); } static inline void _rcu_thread_online(void) diff --git a/urcu-qsbr.c b/urcu-qsbr.c index 5c6eaa0..dac6649 100644 --- a/urcu-qsbr.c +++ b/urcu-qsbr.c @@ -39,6 +39,8 @@ static pthread_mutex_t urcu_mutex = PTHREAD_MUTEX_INITIALIZER; +int gp_futex; + /* * Global grace period counter. */ @@ -99,6 +101,27 @@ static void internal_urcu_unlock(void) } } +/* + * synchronize_rcu() waiting. Single thread. + */ +static void wait_gp(struct reader_registry *index) +{ + atomic_dec(&gp_futex); + smp_mb(); /* Write futex before read reader_gp */ + if (!rcu_gp_ongoing(index->rcu_reader_qs_gp)) { + /* Read reader_gp before write futex */ + smp_mb(); + /* Callbacks are queued, don't wait. */ + atomic_set(&gp_futex, 0); + } else { + /* Read reader_gp before read futex */ + smp_rmb(); + if (atomic_read(&gp_futex) == -1) + futex(&gp_futex, FUTEX_WAIT, -1, + NULL, NULL, 0); + } +} + static void wait_for_quiescent_state(void) { struct reader_registry *index; @@ -109,13 +132,19 @@ static void wait_for_quiescent_state(void) * Wait for each thread rcu_reader_qs_gp count to become 0. */ for (index = registry; index < registry + num_readers; index++) { + int wait_loops = 0; + + while (rcu_gp_ongoing(index->rcu_reader_qs_gp)) { + if (wait_loops++ == RCU_QS_ACTIVE_ATTEMPTS) { + wait_gp(index); + } else { #ifndef HAS_INCOHERENT_CACHES - while (rcu_gp_ongoing(index->rcu_reader_qs_gp)) - cpu_relax(); + cpu_relax(); #else /* #ifndef HAS_INCOHERENT_CACHES */ - while (rcu_gp_ongoing(index->rcu_reader_qs_gp)) - smp_mb(); + smp_mb(); #endif /* #else #ifndef HAS_INCOHERENT_CACHES */ + } + } } } diff --git a/urcu-static.h b/urcu-static.h index f819e6f..3caa0f9 100644 --- a/urcu-static.h +++ b/urcu-static.h @@ -31,6 +31,8 @@ #include #include +#include +#include #include #include @@ -94,6 +96,10 @@ (_________p1); \ }) +#define futex(...) syscall(__NR_futex, __VA_ARGS__) +#define FUTEX_WAIT 0 +#define FUTEX_WAKE 1 + /* * This code section can only be included in LGPL 2.1 compatible source code. * See below for the function call wrappers which can be used in code meant to @@ -117,6 +123,11 @@ */ #define KICK_READER_LOOPS 10000 +/* + * Active attempts to check for reader Q.S. before calling futex(). + */ +#define RCU_QS_ACTIVE_ATTEMPTS 100 + #ifdef DEBUG_RCU #define rcu_assert(args...) assert(args) #else @@ -209,6 +220,20 @@ extern long urcu_gp_ctr; extern long __thread urcu_active_readers; +extern int gp_futex; + +/* + * Wake-up waiting synchronize_rcu(). Called from many concurrent threads. + */ +static inline void wake_up_gp(void) +{ + if (unlikely(atomic_read(&gp_futex) == -1)) { + atomic_set(&gp_futex, 0); + futex(&gp_futex, FUTEX_WAKE, 1, + NULL, NULL, 0); + } +} + static inline int rcu_old_gp_ongoing(long *value) { long v; @@ -244,15 +269,24 @@ static inline void _rcu_read_lock(void) static inline void _rcu_read_unlock(void) { - reader_barrier(); + long tmp; + + tmp = urcu_active_readers; /* * Finish using rcu before decrementing the pointer. * See force_mb_all_threads(). - * Formally only needed for outermost nesting level, but leave barrier - * in place for nested unlocks to remove a branch from the common case - * (no nesting). */ - _STORE_SHARED(urcu_active_readers, urcu_active_readers - RCU_GP_COUNT); + if (likely((tmp & RCU_GP_CTR_NEST_MASK) == RCU_GP_COUNT)) { + reader_barrier(); + _STORE_SHARED(urcu_active_readers, + urcu_active_readers - RCU_GP_COUNT); + /* write urcu_active_readers before read futex */ + reader_barrier(); + wake_up_gp(); + } else { + _STORE_SHARED(urcu_active_readers, + urcu_active_readers - RCU_GP_COUNT); + } } /** diff --git a/urcu.c b/urcu.c index 32f7488..07661a3 100644 --- a/urcu.c +++ b/urcu.c @@ -49,6 +49,8 @@ void urcu_init(void) static pthread_mutex_t urcu_mutex = PTHREAD_MUTEX_INITIALIZER; +int gp_futex; + /* * Global grace period counter. * Contains the current RCU_GP_CTR_BIT. @@ -128,19 +130,16 @@ static void switch_next_urcu_qparity(void) } #ifdef URCU_MB -#ifdef HAS_INCOHERENT_CACHES static void force_mb_single_thread(struct reader_registry *index) { smp_mb(); } -#endif /* #ifdef HAS_INCOHERENT_CACHES */ static void force_mb_all_threads(void) { smp_mb(); } #else /* #ifdef URCU_MB */ -#ifdef HAS_INCOHERENT_CACHES static void force_mb_single_thread(struct reader_registry *index) { assert(registry); @@ -163,7 +162,6 @@ static void force_mb_single_thread(struct reader_registry *index) } smp_mb(); /* read ->need_mb before ending the barrier */ } -#endif /* #ifdef HAS_INCOHERENT_CACHES */ static void force_mb_all_threads(void) { @@ -208,6 +206,27 @@ static void force_mb_all_threads(void) } #endif /* #else #ifdef URCU_MB */ +/* + * synchronize_rcu() waiting. Single thread. + */ +static void wait_gp(struct reader_registry *index) +{ + atomic_dec(&gp_futex); + force_mb_single_thread(index); /* Write futex before read reader_gp */ + if (!rcu_old_gp_ongoing(index->urcu_active_readers)) { + /* Read reader_gp before write futex */ + force_mb_single_thread(index); + /* Callbacks are queued, don't wait. */ + atomic_set(&gp_futex, 0); + } else { + /* Read reader_gp before read futex */ + force_mb_single_thread(index); + if (atomic_read(&gp_futex) == -1) + futex(&gp_futex, FUTEX_WAIT, -1, + NULL, NULL, 0); + } +} + void wait_for_quiescent_state(void) { struct reader_registry *index; @@ -218,20 +237,30 @@ void wait_for_quiescent_state(void) * Wait for each thread urcu_active_readers count to become 0. */ for (index = registry; index < registry + num_readers; index++) { + int wait_loops = 0; #ifndef HAS_INCOHERENT_CACHES - while (rcu_old_gp_ongoing(index->urcu_active_readers)) - cpu_relax(); + while (rcu_old_gp_ongoing(index->urcu_active_readers)) { + if (wait_loops++ == RCU_QS_ACTIVE_ATTEMPTS) { + wait_gp(index); + } else { + cpu_relax(); + } + } #else /* #ifndef HAS_INCOHERENT_CACHES */ - int wait_loops = 0; /* * BUSY-LOOP. Force the reader thread to commit its * urcu_active_readers update to memory if we wait for too long. */ while (rcu_old_gp_ongoing(index->urcu_active_readers)) { - if (wait_loops++ == KICK_READER_LOOPS) { + switch (wait_loops++) { + case RCU_QS_ACTIVE_ATTEMPTS: + wait_gp(index); + break; + case KICK_READER_LOOPS: force_mb_single_thread(index); wait_loops = 0; - } else { + break; + default: cpu_relax(); } } -- 2.34.1