X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu.c;h=475bebf526dfd1b9cc8cf78b87d8804ded7a1b6f;hp=9542b264a6dbf53d36d2fb63e7578b4e7ff2b2f5;hb=2d6debff95ad695255d2ea9d590d1e418590b238;hpb=9598a4814c854780e9ca9bb2cfff8d77442c3db6 diff --git a/urcu.c b/urcu.c index 9542b26..475bebf 100644 --- a/urcu.c +++ b/urcu.c @@ -23,9 +23,14 @@ pthread_mutex_t urcu_mutex = PTHREAD_MUTEX_INITIALIZER; * Global grace period counter. * Contains the current RCU_GP_CTR_BIT. * Also has a RCU_GP_CTR_BIT of 1, to accelerate the reader fast path. + * Written to only by writer with mutex taken. Read by both writer and readers. */ long urcu_gp_ctr = RCU_GP_COUNT; +/* + * Written to only by each individual reader. Read by both the reader and the + * writers. + */ long __thread urcu_active_readers; /* Thread IDs of registered readers */ @@ -43,16 +48,23 @@ unsigned int __thread rand_yield; static struct reader_data *reader_data; static int num_readers, alloc_readers; +#ifndef DEBUG_FULL_MB static int sig_done; +#endif void internal_urcu_lock(void) { +#if 0 int ret; + /* Mutex sleeping does not play well with busy-waiting loop. */ ret = pthread_mutex_lock(&urcu_mutex); if (ret) { perror("Error in pthread mutex lock"); exit(-1); } +#endif + while (pthread_mutex_trylock(&urcu_mutex) != 0) + cpu_relax(); } void internal_urcu_unlock(void) @@ -71,42 +83,70 @@ void internal_urcu_unlock(void) */ static void switch_next_urcu_qparity(void) { - urcu_gp_ctr ^= RCU_GP_CTR_BIT; + STORE_SHARED(urcu_gp_ctr, urcu_gp_ctr ^ RCU_GP_CTR_BIT); } #ifdef DEBUG_FULL_MB +static void force_mb_single_thread(pthread_t tid) +{ + smp_mb(); +} + static void force_mb_all_threads(void) { - mb(); + smp_mb(); } #else + +static void force_mb_single_thread(pthread_t tid) +{ + assert(reader_data); + sig_done = 0; + /* + * pthread_kill has a smp_mb(). But beware, we assume it performs + * a cache flush on architectures with non-coherent cache. Let's play + * safe and don't assume anything : we use smp_mc() to make sure the + * cache flush is enforced. + * smp_mb(); write sig_done before sending the signals + */ + smp_mc(); /* write sig_done before sending the signals */ + pthread_kill(tid, SIGURCU); + /* + * Wait for sighandler (and thus mb()) to execute on every thread. + * BUSY-LOOP. + */ + while (LOAD_SHARED(sig_done) < 1) + cpu_relax(); + smp_mb(); /* read sig_done before ending the barrier */ +} + static void force_mb_all_threads(void) { struct reader_data *index; /* - * Ask for each threads to execute a mb() so we can consider the + * Ask for each threads to execute a smp_mb() so we can consider the * compiler barriers around rcu read lock as real memory barriers. */ if (!reader_data) return; - debug_yield_write(); sig_done = 0; - debug_yield_write(); - mb(); /* write sig_done before sending the signals */ - debug_yield_write(); - for (index = reader_data; index < reader_data + num_readers; index++) { + /* + * pthread_kill has a smp_mb(). But beware, we assume it performs + * a cache flush on architectures with non-coherent cache. Let's play + * safe and don't assume anything : we use smp_mc() to make sure the + * cache flush is enforced. + * smp_mb(); write sig_done before sending the signals + */ + smp_mc(); /* write sig_done before sending the signals */ + for (index = reader_data; index < reader_data + num_readers; index++) pthread_kill(index->tid, SIGURCU); - debug_yield_write(); - } /* * Wait for sighandler (and thus mb()) to execute on every thread. * BUSY-LOOP. */ - while (sig_done < num_readers) - barrier(); - debug_yield_write(); - mb(); /* read sig_done before ending the barrier */ - debug_yield_write(); + while (LOAD_SHARED(sig_done) < num_readers) + cpu_relax(); + smp_mb(); /* read sig_done before ending the barrier */ } #endif @@ -116,79 +156,80 @@ void wait_for_quiescent_state(void) if (!reader_data) return; - /* Wait for each thread urcu_active_readers count to become 0. + /* + * Wait for each thread urcu_active_readers count to become 0. */ for (index = reader_data; index < reader_data + num_readers; index++) { + int wait_loops = 0; /* - * BUSY-LOOP. + * BUSY-LOOP. Force the reader thread to commit its + * urcu_active_readers update to memory if we wait for too long. */ - while (rcu_old_gp_ongoing(index->urcu_active_readers)) - barrier(); + while (rcu_old_gp_ongoing(index->urcu_active_readers)) { + if (wait_loops++ == KICK_READER_LOOPS) { + force_mb_single_thread(index->tid); + wait_loops = 0; + } else { + cpu_relax(); + } + } } } void synchronize_rcu(void) { + internal_urcu_lock(); + /* All threads should read qparity before accessing data structure - * where new ptr points to. */ + * where new ptr points to. Must be done within internal_urcu_lock + * because it iterates on reader threads.*/ /* Write new ptr before changing the qparity */ force_mb_all_threads(); - debug_yield_write(); - - internal_urcu_lock(); - debug_yield_write(); switch_next_urcu_qparity(); /* 0 -> 1 */ - debug_yield_write(); /* * Must commit qparity update to memory before waiting for parity * 0 quiescent state. Failure to do so could result in the writer * waiting forever while new readers are always accessing data (no * progress). + * Ensured by STORE_SHARED and LOAD_SHARED. */ - mb(); /* * Wait for previous parity to be empty of readers. */ wait_for_quiescent_state(); /* Wait readers in parity 0 */ - debug_yield_write(); /* * Must finish waiting for quiescent state for parity 0 before * committing qparity update to memory. Failure to do so could result in * the writer waiting forever while new readers are always accessing * data (no progress). + * Ensured by STORE_SHARED and LOAD_SHARED. */ - mb(); switch_next_urcu_qparity(); /* 1 -> 0 */ - debug_yield_write(); /* * Must commit qparity update to memory before waiting for parity * 1 quiescent state. Failure to do so could result in the writer * waiting forever while new readers are always accessing data (no * progress). + * Ensured by STORE_SHARED and LOAD_SHARED. */ - mb(); /* * Wait for previous parity to be empty of readers. */ wait_for_quiescent_state(); /* Wait readers in parity 1 */ - debug_yield_write(); - - internal_urcu_unlock(); - debug_yield_write(); - /* All threads should finish using the data referred to by old ptr - * before decrementing their urcu_active_readers count */ /* Finish waiting for reader threads before letting the old ptr being - * freed. */ + * freed. Must be done within internal_urcu_lock because it iterates on + * reader threads. */ force_mb_all_threads(); - debug_yield_write(); + + internal_urcu_unlock(); } void urcu_add_reader(pthread_t id) @@ -256,7 +297,12 @@ void urcu_unregister_thread(void) #ifndef DEBUG_FULL_MB void sigurcu_handler(int signo, siginfo_t *siginfo, void *context) { - mb(); + /* + * Executing this smp_mb() is the only purpose of this signal handler. + * It punctually promotes barrier() into smp_mb() on every thread it is + * executed on. + */ + smp_mb(); atomic_inc(&sig_done); }