Add futex support to accelerate synchronize_rcu() on UP
authorMathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
Fri, 25 Sep 2009 21:49:31 +0000 (17:49 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
Fri, 25 Sep 2009 21:49:31 +0000 (17:49 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
urcu-qsbr-static.h
urcu-qsbr.c
urcu-static.h
urcu.c

index c0467cd0e1606c87a522e56d2299716cf6402b3e..87305cb694a9bcc4e175f37829736fcdb81d9686 100644 (file)
@@ -33,6 +33,8 @@
 #include <pthread.h>
 #include <assert.h>
 #include <limits.h>
+#include <syscall.h>
+#include <unistd.h>
 
 #include <compiler.h>
 #include <arch.h>
                                (_________p1);                          \
                                })
 
+#define futex(...)             syscall(__NR_futex, __VA_ARGS__)
+#define FUTEX_WAIT             0
+#define FUTEX_WAKE             1
+
 /*
  * This code section can only be included in LGPL 2.1 compatible source code.
  * See below for the function call wrappers which can be used in code meant to
  */
 #define KICK_READER_LOOPS 10000
 
+/*
+ * Active attempts to check for reader Q.S. before calling futex().
+ */
+#define RCU_QS_ACTIVE_ATTEMPTS 100
+
 #ifdef DEBUG_RCU
 #define rcu_assert(args...)    assert(args)
 #else
@@ -173,6 +184,20 @@ extern unsigned long urcu_gp_ctr;
 
 extern unsigned long __thread rcu_reader_qs_gp;
 
+extern int gp_futex;
+
+/*
+ * Wake-up waiting synchronize_rcu(). Called from many concurrent threads.
+ */
+static inline void wake_up_gp(void)
+{
+       if (unlikely(atomic_read(&gp_futex) == -1)) {
+               atomic_set(&gp_futex, 0);
+               futex(&gp_futex, FUTEX_WAKE, 1,
+                     NULL, NULL, 0);
+       }
+}
+
 #if (BITS_PER_LONG < 64)
 static inline int rcu_gp_ongoing(unsigned long *value)
 {
@@ -208,6 +233,8 @@ static inline void _rcu_quiescent_state(void)
 {
        smp_mb();       
        _STORE_SHARED(rcu_reader_qs_gp, _LOAD_SHARED(urcu_gp_ctr));
+       smp_mb();       /* write rcu_reader_qs_gp before read futex */
+       wake_up_gp();
        smp_mb();
 }
 
@@ -215,6 +242,8 @@ static inline void _rcu_thread_offline(void)
 {
        smp_mb();
        STORE_SHARED(rcu_reader_qs_gp, 0);
+       smp_mb();       /* write rcu_reader_qs_gp before read futex */
+       wake_up_gp();
 }
 
 static inline void _rcu_thread_online(void)
index 5c6eaa014721267da7de5f85fff22835d41a80b2..dac664935d7b898bd3dcce494845c7471c09749d 100644 (file)
@@ -39,6 +39,8 @@
 
 static pthread_mutex_t urcu_mutex = PTHREAD_MUTEX_INITIALIZER;
 
+int gp_futex;
+
 /*
  * Global grace period counter.
  */
@@ -99,6 +101,27 @@ static void internal_urcu_unlock(void)
        }
 }
 
+/*
+ * synchronize_rcu() waiting. Single thread.
+ */
+static void wait_gp(struct reader_registry *index)
+{
+       atomic_dec(&gp_futex);
+       smp_mb(); /* Write futex before read reader_gp */
+       if (!rcu_gp_ongoing(index->rcu_reader_qs_gp)) {
+               /* Read reader_gp before write futex */
+               smp_mb();
+               /* Callbacks are queued, don't wait. */
+               atomic_set(&gp_futex, 0);
+       } else {
+               /* Read reader_gp before read futex */
+               smp_rmb();
+               if (atomic_read(&gp_futex) == -1)
+                       futex(&gp_futex, FUTEX_WAIT, -1,
+                             NULL, NULL, 0);
+       }
+}
+
 static void wait_for_quiescent_state(void)
 {
        struct reader_registry *index;
@@ -109,13 +132,19 @@ static void wait_for_quiescent_state(void)
         * Wait for each thread rcu_reader_qs_gp count to become 0.
         */
        for (index = registry; index < registry + num_readers; index++) {
+               int wait_loops = 0;
+
+               while (rcu_gp_ongoing(index->rcu_reader_qs_gp)) {
+                       if (wait_loops++ == RCU_QS_ACTIVE_ATTEMPTS) {
+                               wait_gp(index);
+                       } else {
 #ifndef HAS_INCOHERENT_CACHES
-               while (rcu_gp_ongoing(index->rcu_reader_qs_gp))
-                       cpu_relax();
+                               cpu_relax();
 #else /* #ifndef HAS_INCOHERENT_CACHES */
-               while (rcu_gp_ongoing(index->rcu_reader_qs_gp))
-                       smp_mb();
+                               smp_mb();
 #endif /* #else #ifndef HAS_INCOHERENT_CACHES */
+                       }
+               }
        }
 }
 
index f819e6fc9aa07e9f4e58329661bf5a494936bb25..3caa0f93d55058528a07c5fb20ba3a0e80191e79 100644 (file)
@@ -31,6 +31,8 @@
 
 #include <stdlib.h>
 #include <pthread.h>
+#include <syscall.h>
+#include <unistd.h>
 
 #include <compiler.h>
 #include <arch.h>
                                (_________p1);                          \
                                })
 
+#define futex(...)             syscall(__NR_futex, __VA_ARGS__)
+#define FUTEX_WAIT             0
+#define FUTEX_WAKE             1
+
 /*
  * This code section can only be included in LGPL 2.1 compatible source code.
  * See below for the function call wrappers which can be used in code meant to
  */
 #define KICK_READER_LOOPS 10000
 
+/*
+ * Active attempts to check for reader Q.S. before calling futex().
+ */
+#define RCU_QS_ACTIVE_ATTEMPTS 100
+
 #ifdef DEBUG_RCU
 #define rcu_assert(args...)    assert(args)
 #else
@@ -209,6 +220,20 @@ extern long urcu_gp_ctr;
 
 extern long __thread urcu_active_readers;
 
+extern int gp_futex;
+
+/*
+ * Wake-up waiting synchronize_rcu(). Called from many concurrent threads.
+ */
+static inline void wake_up_gp(void)
+{
+       if (unlikely(atomic_read(&gp_futex) == -1)) {
+               atomic_set(&gp_futex, 0);
+               futex(&gp_futex, FUTEX_WAKE, 1,
+                     NULL, NULL, 0);
+       }
+}
+
 static inline int rcu_old_gp_ongoing(long *value)
 {
        long v;
@@ -244,15 +269,24 @@ static inline void _rcu_read_lock(void)
 
 static inline void _rcu_read_unlock(void)
 {
-       reader_barrier();
+       long tmp;
+
+       tmp = urcu_active_readers;
        /*
         * Finish using rcu before decrementing the pointer.
         * See force_mb_all_threads().
-        * Formally only needed for outermost nesting level, but leave barrier
-        * in place for nested unlocks to remove a branch from the common case
-        * (no nesting).
         */
-       _STORE_SHARED(urcu_active_readers, urcu_active_readers - RCU_GP_COUNT);
+       if (likely((tmp & RCU_GP_CTR_NEST_MASK) == RCU_GP_COUNT)) {
+               reader_barrier();
+               _STORE_SHARED(urcu_active_readers,
+                             urcu_active_readers - RCU_GP_COUNT);
+               /* write urcu_active_readers before read futex */
+               reader_barrier();
+               wake_up_gp();
+       } else {
+               _STORE_SHARED(urcu_active_readers,
+                             urcu_active_readers - RCU_GP_COUNT);
+       }
 }
 
 /**
diff --git a/urcu.c b/urcu.c
index 32f74880492ce40527c8b5b40560249eac485b90..07661a3cb448b34505f24a8552a421741bb282bd 100644 (file)
--- a/urcu.c
+++ b/urcu.c
@@ -49,6 +49,8 @@ void urcu_init(void)
 
 static pthread_mutex_t urcu_mutex = PTHREAD_MUTEX_INITIALIZER;
 
+int gp_futex;
+
 /*
  * Global grace period counter.
  * Contains the current RCU_GP_CTR_BIT.
@@ -128,19 +130,16 @@ static void switch_next_urcu_qparity(void)
 }
 
 #ifdef URCU_MB
-#ifdef HAS_INCOHERENT_CACHES
 static void force_mb_single_thread(struct reader_registry *index)
 {
        smp_mb();
 }
-#endif /* #ifdef HAS_INCOHERENT_CACHES */
 
 static void force_mb_all_threads(void)
 {
        smp_mb();
 }
 #else /* #ifdef URCU_MB */
-#ifdef HAS_INCOHERENT_CACHES
 static void force_mb_single_thread(struct reader_registry *index)
 {
        assert(registry);
@@ -163,7 +162,6 @@ static void force_mb_single_thread(struct reader_registry *index)
        }
        smp_mb();       /* read ->need_mb before ending the barrier */
 }
-#endif /* #ifdef HAS_INCOHERENT_CACHES */
 
 static void force_mb_all_threads(void)
 {
@@ -208,6 +206,27 @@ static void force_mb_all_threads(void)
 }
 #endif /* #else #ifdef URCU_MB */
 
+/*
+ * synchronize_rcu() waiting. Single thread.
+ */
+static void wait_gp(struct reader_registry *index)
+{
+       atomic_dec(&gp_futex);
+       force_mb_single_thread(index); /* Write futex before read reader_gp */
+       if (!rcu_old_gp_ongoing(index->urcu_active_readers)) {
+               /* Read reader_gp before write futex */
+               force_mb_single_thread(index);
+               /* Callbacks are queued, don't wait. */
+               atomic_set(&gp_futex, 0);
+       } else {
+               /* Read reader_gp before read futex */
+               force_mb_single_thread(index);
+               if (atomic_read(&gp_futex) == -1)
+                       futex(&gp_futex, FUTEX_WAIT, -1,
+                             NULL, NULL, 0);
+       }
+}
+
 void wait_for_quiescent_state(void)
 {
        struct reader_registry *index;
@@ -218,20 +237,30 @@ void wait_for_quiescent_state(void)
         * Wait for each thread urcu_active_readers count to become 0.
         */
        for (index = registry; index < registry + num_readers; index++) {
+               int wait_loops = 0;
 #ifndef HAS_INCOHERENT_CACHES
-               while (rcu_old_gp_ongoing(index->urcu_active_readers))
-                       cpu_relax();
+               while (rcu_old_gp_ongoing(index->urcu_active_readers)) {
+                       if (wait_loops++ == RCU_QS_ACTIVE_ATTEMPTS) {
+                               wait_gp(index);
+                       } else {
+                               cpu_relax();
+                       }
+               }
 #else /* #ifndef HAS_INCOHERENT_CACHES */
-               int wait_loops = 0;
                /*
                 * BUSY-LOOP. Force the reader thread to commit its
                 * urcu_active_readers update to memory if we wait for too long.
                 */
                while (rcu_old_gp_ongoing(index->urcu_active_readers)) {
-                       if (wait_loops++ == KICK_READER_LOOPS) {
+                       switch (wait_loops++) {
+                       case RCU_QS_ACTIVE_ATTEMPTS:
+                               wait_gp(index);
+                               break;
+                       case KICK_READER_LOOPS:
                                force_mb_single_thread(index);
                                wait_loops = 0;
-                       } else {
+                               break;
+                       default:
                                cpu_relax();
                        }
                }
This page took 0.029508 seconds and 4 git commands to generate.