tests: use standard malloc/free for synchronize_rcu()
[urcu.git] / urcu-qsbr.c
index d3a6849ac04901097783fd0810b7a4aba4bf792c..5b341b5cfd6d97034b08ca5ea6df913a0c8a7736 100644 (file)
@@ -35,7 +35,7 @@
 #include <errno.h>
 #include <poll.h>
 
-#include "urcu/wfqueue.h"
+#include "urcu/wfcqueue.h"
 #include "urcu/map/urcu-qsbr.h"
 #define BUILD_QSBR_LIB
 #include "urcu/static/urcu-qsbr.h"
@@ -53,7 +53,7 @@ void __attribute__((destructor)) rcu_exit(void);
 
 static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER;
 
-int32_t gp_futex;
+int32_t rcu_gp_futex;
 
 /*
  * Global grace period counter.
@@ -72,8 +72,8 @@ unsigned long rcu_gp_ctr = RCU_GP_ONLINE;
 DEFINE_URCU_TLS(struct rcu_reader, rcu_reader);
 
 #ifdef DEBUG_YIELD
-unsigned int yield_active;
-DEFINE_URCU_TLS(unsigned int, rand_yield);
+unsigned int rcu_yield_active;
+DEFINE_URCU_TLS(unsigned int, rcu_rand_yield);
 #endif
 
 static CDS_LIST_HEAD(registry);
@@ -111,69 +111,66 @@ static void wait_gp(void)
 {
        /* Read reader_gp before read futex */
        cmm_smp_rmb();
-       if (uatomic_read(&gp_futex) == -1)
-               futex_noasync(&gp_futex, FUTEX_WAIT, -1,
+       if (uatomic_read(&rcu_gp_futex) == -1)
+               futex_noasync(&rcu_gp_futex, FUTEX_WAIT, -1,
                      NULL, NULL, 0);
 }
 
-static void update_counter_and_wait(void)
+static void wait_for_readers(struct cds_list_head *input_readers,
+                       struct cds_list_head *cur_snap_readers,
+                       struct cds_list_head *qsreaders)
 {
-       CDS_LIST_HEAD(qsreaders);
        int wait_loops = 0;
        struct rcu_reader *index, *tmp;
 
-#if (CAA_BITS_PER_LONG < 64)
-       /* Switch parity: 0 -> 1, 1 -> 0 */
-       CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
-#else  /* !(CAA_BITS_PER_LONG < 64) */
-       /* Increment current G.P. */
-       CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
-#endif /* !(CAA_BITS_PER_LONG < 64) */
-
-       /*
-        * Must commit rcu_gp_ctr update to memory before waiting for
-        * quiescent state. Failure to do so could result in the writer
-        * waiting forever while new readers are always accessing data
-        * (no progress). Enforce compiler-order of store to rcu_gp_ctr
-        * before load URCU_TLS(rcu_reader).ctr.
-        */
-       cmm_barrier();
-
-       /*
-        * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
-        * model easier to understand. It does not have a big performance impact
-        * anyway, given this is the write-side.
-        */
-       cmm_smp_mb();
-
        /*
-        * Wait for each thread rcu_reader_qs_gp count to become 0.
+        * Wait for each thread URCU_TLS(rcu_reader).ctr to either
+        * indicate quiescence (offline), or for them to observe the
+        * current rcu_gp_ctr value.
         */
        for (;;) {
                wait_loops++;
                if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
-                       uatomic_set(&gp_futex, -1);
+                       uatomic_set(&rcu_gp_futex, -1);
                        /*
                         * Write futex before write waiting (the other side
                         * reads them in the opposite order).
                         */
                        cmm_smp_wmb();
-                       cds_list_for_each_entry(index, &registry, node) {
+                       cds_list_for_each_entry(index, input_readers, node) {
                                _CMM_STORE_SHARED(index->waiting, 1);
                        }
                        /* Write futex before read reader_gp */
                        cmm_smp_mb();
                }
-               cds_list_for_each_entry_safe(index, tmp, &registry, node) {
-                       if (!rcu_gp_ongoing(&index->ctr))
-                               cds_list_move(&index->node, &qsreaders);
+               cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
+                       switch (rcu_reader_state(&index->ctr)) {
+                       case RCU_READER_ACTIVE_CURRENT:
+                               if (cur_snap_readers) {
+                                       cds_list_move(&index->node,
+                                               cur_snap_readers);
+                                       break;
+                               }
+                               /* Fall-through */
+                       case RCU_READER_INACTIVE:
+                               cds_list_move(&index->node, qsreaders);
+                               break;
+                       case RCU_READER_ACTIVE_OLD:
+                               /*
+                                * Old snapshot. Leaving node in
+                                * input_readers will make us busy-loop
+                                * until the snapshot becomes current or
+                                * the reader becomes inactive.
+                                */
+                               break;
+                       }
                }
 
-               if (cds_list_empty(&registry)) {
+               if (cds_list_empty(input_readers)) {
                        if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
                                /* Read reader_gp before write futex */
                                cmm_smp_mb();
-                               uatomic_set(&gp_futex, 0);
+                               uatomic_set(&rcu_gp_futex, 0);
                        }
                        break;
                } else {
@@ -188,8 +185,6 @@ static void update_counter_and_wait(void)
                        }
                }
        }
-       /* put back the reader list in the registry */
-       cds_list_splice(&qsreaders, &registry);
 }
 
 /*
@@ -200,6 +195,8 @@ static void update_counter_and_wait(void)
 #if (CAA_BITS_PER_LONG < 64)
 void synchronize_rcu(void)
 {
+       CDS_LIST_HEAD(cur_snap_readers);
+       CDS_LIST_HEAD(qsreaders);
        unsigned long was_online;
 
        was_online = URCU_TLS(rcu_reader).ctr;
@@ -223,17 +220,36 @@ void synchronize_rcu(void)
                goto out;
 
        /*
-        * Wait for previous parity to be empty of readers.
+        * Wait for readers to observe original parity or be quiescent.
         */
-       update_counter_and_wait();      /* 0 -> 1, wait readers in parity 0 */
+       wait_for_readers(&registry, &cur_snap_readers, &qsreaders);
 
        /*
-        * Must finish waiting for quiescent state for parity 0 before
-        * committing next rcu_gp_ctr update to memory. Failure to
-        * do so could result in the writer waiting forever while new
+        * Must finish waiting for quiescent state for original parity
+        * before committing next rcu_gp_ctr update to memory. Failure
+        * to do so could result in the writer waiting forever while new
         * readers are always accessing data (no progress).  Enforce
-        * compiler-order of load URCU_TLS(rcu_reader).ctr before store to
-        * rcu_gp_ctr.
+        * compiler-order of load URCU_TLS(rcu_reader).ctr before store
+        * to rcu_gp_ctr.
+        */
+       cmm_barrier();
+
+       /*
+        * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
+        * model easier to understand. It does not have a big performance impact
+        * anyway, given this is the write-side.
+        */
+       cmm_smp_mb();
+
+       /* Switch parity: 0 -> 1, 1 -> 0 */
+       CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+
+       /*
+        * Must commit rcu_gp_ctr update to memory before waiting for
+        * quiescent state. Failure to do so could result in the writer
+        * waiting forever while new readers are always accessing data
+        * (no progress). Enforce compiler-order of store to rcu_gp_ctr
+        * before load URCU_TLS(rcu_reader).ctr.
         */
        cmm_barrier();
 
@@ -245,9 +261,14 @@ void synchronize_rcu(void)
        cmm_smp_mb();
 
        /*
-        * Wait for previous parity to be empty of readers.
+        * Wait for readers to observe new parity or be quiescent.
+        */
+       wait_for_readers(&cur_snap_readers, NULL, &qsreaders);
+
+       /*
+        * Put quiescent reader list back into registry.
         */
-       update_counter_and_wait();      /* 1 -> 0, wait readers in parity 1 */
+       cds_list_splice(&qsreaders, &registry);
 out:
        mutex_unlock(&rcu_gp_lock);
 
@@ -263,6 +284,7 @@ out:
 #else /* !(CAA_BITS_PER_LONG < 64) */
 void synchronize_rcu(void)
 {
+       CDS_LIST_HEAD(qsreaders);
        unsigned long was_online;
 
        was_online = URCU_TLS(rcu_reader).ctr;
@@ -280,7 +302,35 @@ void synchronize_rcu(void)
        mutex_lock(&rcu_gp_lock);
        if (cds_list_empty(&registry))
                goto out;
-       update_counter_and_wait();
+
+       /* Increment current G.P. */
+       CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
+
+       /*
+        * Must commit rcu_gp_ctr update to memory before waiting for
+        * quiescent state. Failure to do so could result in the writer
+        * waiting forever while new readers are always accessing data
+        * (no progress). Enforce compiler-order of store to rcu_gp_ctr
+        * before load URCU_TLS(rcu_reader).ctr.
+        */
+       cmm_barrier();
+
+       /*
+        * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
+        * model easier to understand. It does not have a big performance impact
+        * anyway, given this is the write-side.
+        */
+       cmm_smp_mb();
+
+       /*
+        * Wait for readers to observe new count of be quiescent.
+        */
+       wait_for_readers(&registry, NULL, &qsreaders);
+
+       /*
+        * Put quiescent reader list back into registry.
+        */
+       cds_list_splice(&qsreaders, &registry);
 out:
        mutex_unlock(&rcu_gp_lock);
 
This page took 0.02573 seconds and 4 git commands to generate.