X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu-defer-impl.h;h=f96553365ba4f51035f427798edd2ac4eb981f13;hp=d1ab0466096e5ebccc594658e843eab25d55dac7;hb=a59f39055b5ecb77b68cf78b9839aa9e8e4ec332;hpb=9d2614f07691a813a3c560a6c0bcd0a7be854ed5 diff --git a/urcu-defer-impl.h b/urcu-defer-impl.h index d1ab046..f965533 100644 --- a/urcu-defer-impl.h +++ b/urcu-defer-impl.h @@ -48,6 +48,8 @@ #include #include #include +#include +#include "urcu-die.h" /* * Number of entries in the per-thread defer queue. Must be power of 2. @@ -61,7 +63,9 @@ * Assumes that (void *)-2L is not used often. Used to encode non-aligned * functions and non-aligned data using extra space. * We encode the (void *)-2L fct as: -2L, fct, data. - * We encode the (void *)-2L data as: -2L, fct, data. + * We encode the (void *)-2L data as either: + * fct | DQ_FCT_BIT, data (if fct is aligned), or + * -2L, fct, data (if fct is not aligned). * Here, DQ_FCT_MARK == ~DQ_FCT_BIT. Required for the test order. */ #define DQ_FCT_BIT (1 << 0) @@ -80,12 +84,6 @@ * This is required to permit relinking with newer versions of the library. */ -#ifdef DEBUG_RCU -#define rcu_assert(args...) assert(args) -#else -#define rcu_assert(args...) -#endif - /* * defer queue. * Contains pointers. Encoded to save space when same callback is often used. @@ -122,12 +120,13 @@ static pthread_mutex_t rcu_defer_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t defer_thread_mutex = PTHREAD_MUTEX_INITIALIZER; static int32_t defer_thread_futex; +static int32_t defer_thread_stop; /* * Written to only by each individual deferer. Read by both the deferer and * the reclamation tread. */ -static struct defer_queue __thread defer_queue; +static DEFINE_URCU_TLS(struct defer_queue, defer_queue); static CDS_LIST_HEAD(registry_defer); static pthread_t tid_defer; @@ -137,19 +136,13 @@ static void mutex_lock_defer(pthread_mutex_t *mutex) #ifndef DISTRUST_SIGNALS_EXTREME ret = pthread_mutex_lock(mutex); - if (ret) { - perror("Error in pthread mutex lock"); - exit(-1); - } + if (ret) + urcu_die(ret); #else /* #ifndef DISTRUST_SIGNALS_EXTREME */ while ((ret = pthread_mutex_trylock(mutex)) != 0) { - if (ret != EBUSY && ret != EINTR) { - printf("ret = %d, errno = %d\n", ret, errno); - perror("Error in pthread mutex lock"); - exit(-1); - } - pthread_testcancel(); - poll(NULL,0,10); + if (ret != EBUSY && ret != EINTR) + urcu_die(ret); + (void) poll(NULL,0,10); } #endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */ } @@ -159,10 +152,11 @@ static void mutex_lock_defer(pthread_mutex_t *mutex) */ static void wake_up_defer(void) { - if (unlikely(uatomic_read(&defer_thread_futex) == -1)) { + if (caa_unlikely(uatomic_read(&defer_thread_futex) == -1)) { uatomic_set(&defer_thread_futex, 0); - futex_noasync(&defer_thread_futex, FUTEX_WAKE, 1, - NULL, NULL, 0); + if (futex_noasync(&defer_thread_futex, FUTEX_WAKE, 1, + NULL, NULL, 0) < 0) + urcu_die(errno); } } @@ -186,16 +180,35 @@ static unsigned long rcu_defer_num_callbacks(void) static void wait_defer(void) { uatomic_dec(&defer_thread_futex); - cmm_smp_mb(); /* Write futex before read queue */ + /* Write futex before read queue */ + /* Write futex before read defer_thread_stop */ + cmm_smp_mb(); + if (_CMM_LOAD_SHARED(defer_thread_stop)) { + uatomic_set(&defer_thread_futex, 0); + pthread_exit(0); + } if (rcu_defer_num_callbacks()) { cmm_smp_mb(); /* Read queue before write futex */ /* Callbacks are queued, don't wait. */ uatomic_set(&defer_thread_futex, 0); } else { cmm_smp_rmb(); /* Read queue before read futex */ - if (uatomic_read(&defer_thread_futex) == -1) - futex_noasync(&defer_thread_futex, FUTEX_WAIT, -1, - NULL, NULL, 0); + if (uatomic_read(&defer_thread_futex) != -1) + return; + while (futex_noasync(&defer_thread_futex, FUTEX_WAIT, -1, + NULL, NULL, 0)) { + switch (errno) { + case EWOULDBLOCK: + /* Value already changed. */ + return; + case EINTR: + /* Retry if interrupted by signal. */ + break; /* Get out of switch. */ + default: + /* Unexpected error. */ + urcu_die(errno); + } + } } } @@ -217,11 +230,11 @@ static void rcu_defer_barrier_queue(struct defer_queue *queue, for (i = queue->tail; i != head;) { cmm_smp_rmb(); /* read head before q[]. */ p = CMM_LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]); - if (unlikely(DQ_IS_FCT_BIT(p))) { + if (caa_unlikely(DQ_IS_FCT_BIT(p))) { DQ_CLEAR_FCT_BIT(p); queue->last_fct_out = p; p = CMM_LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]); - } else if (unlikely(p == DQ_FCT_MARK)) { + } else if (caa_unlikely(p == DQ_FCT_MARK)) { p = CMM_LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]); queue->last_fct_out = p; p = CMM_LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]); @@ -237,12 +250,12 @@ static void _rcu_defer_barrier_thread(void) { unsigned long head, num_items; - head = defer_queue.head; - num_items = head - defer_queue.tail; - if (unlikely(!num_items)) + head = URCU_TLS(defer_queue).head; + num_items = head - URCU_TLS(defer_queue).tail; + if (caa_unlikely(!num_items)) return; synchronize_rcu(); - rcu_defer_barrier_queue(&defer_queue, head); + rcu_defer_barrier_queue(&URCU_TLS(defer_queue), head); } void rcu_defer_barrier_thread(void) @@ -278,7 +291,7 @@ void rcu_defer_barrier(void) index->last_head = CMM_LOAD_SHARED(index->head); num_items += index->last_head - index->tail; } - if (likely(!num_items)) { + if (caa_likely(!num_items)) { /* * We skip the grace period because there are no queued * callbacks to execute. @@ -295,7 +308,7 @@ end: /* * _defer_rcu - Queue a RCU callback. */ -void _defer_rcu(void (*fct)(void *p), void *p) +static void _defer_rcu(void (*fct)(void *p), void *p) { unsigned long head, tail; @@ -303,52 +316,54 @@ void _defer_rcu(void (*fct)(void *p), void *p) * Head is only modified by ourself. Tail can be modified by reclamation * thread. */ - head = defer_queue.head; - tail = CMM_LOAD_SHARED(defer_queue.tail); + head = URCU_TLS(defer_queue).head; + tail = CMM_LOAD_SHARED(URCU_TLS(defer_queue).tail); /* * If queue is full, or reached threshold. Empty queue ourself. * Worse-case: must allow 2 supplementary entries for fct pointer. */ - if (unlikely(head - tail >= DEFER_QUEUE_SIZE - 2)) { + if (caa_unlikely(head - tail >= DEFER_QUEUE_SIZE - 2)) { assert(head - tail <= DEFER_QUEUE_SIZE); rcu_defer_barrier_thread(); - assert(head - CMM_LOAD_SHARED(defer_queue.tail) == 0); + assert(head - CMM_LOAD_SHARED(URCU_TLS(defer_queue).tail) == 0); } - if (unlikely(defer_queue.last_fct_in != fct)) { - defer_queue.last_fct_in = fct; - if (unlikely(DQ_IS_FCT_BIT(fct) || fct == DQ_FCT_MARK)) { - /* - * If the function to encode is not aligned or the - * marker, write DQ_FCT_MARK followed by the function - * pointer. - */ - _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], + /* + * Encode: + * if the function is not changed and the data is aligned and it is + * not the marker: + * store the data + * otherwise if the function is aligned and its not the marker: + * store the function with DQ_FCT_BIT + * store the data + * otherwise: + * store the marker (DQ_FCT_MARK) + * store the function + * store the data + * + * Decode: see the comments before 'struct defer_queue' + * or the code in rcu_defer_barrier_queue(). + */ + if (caa_unlikely(URCU_TLS(defer_queue).last_fct_in != fct + || DQ_IS_FCT_BIT(p) + || p == DQ_FCT_MARK)) { + URCU_TLS(defer_queue).last_fct_in = fct; + if (caa_unlikely(DQ_IS_FCT_BIT(fct) || fct == DQ_FCT_MARK)) { + _CMM_STORE_SHARED(URCU_TLS(defer_queue).q[head++ & DEFER_QUEUE_MASK], DQ_FCT_MARK); - _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], + _CMM_STORE_SHARED(URCU_TLS(defer_queue).q[head++ & DEFER_QUEUE_MASK], fct); } else { DQ_SET_FCT_BIT(fct); - _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], - fct); - } - } else { - if (unlikely(DQ_IS_FCT_BIT(p) || p == DQ_FCT_MARK)) { - /* - * If the data to encode is not aligned or the marker, - * write DQ_FCT_MARK followed by the function pointer. - */ - _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], - DQ_FCT_MARK); - _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], + _CMM_STORE_SHARED(URCU_TLS(defer_queue).q[head++ & DEFER_QUEUE_MASK], fct); } } - _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], p); + _CMM_STORE_SHARED(URCU_TLS(defer_queue).q[head++ & DEFER_QUEUE_MASK], p); cmm_smp_wmb(); /* Publish new pointer before head */ /* Write q[] before head. */ - CMM_STORE_SHARED(defer_queue.head, head); + CMM_STORE_SHARED(URCU_TLS(defer_queue).head, head); cmm_smp_mb(); /* Write queue head before read futex */ /* * Wake-up any waiting defer thread. @@ -356,10 +371,9 @@ void _defer_rcu(void (*fct)(void *p), void *p) wake_up_defer(); } -void *thr_defer(void *args) +static void *thr_defer(void *args) { for (;;) { - pthread_testcancel(); /* * "Be green". Don't wake up the CPU if there is no RCU work * to perform whatsoever. Aims at saving laptop battery life by @@ -367,7 +381,7 @@ void *thr_defer(void *args) */ wait_defer(); /* Sleeping after wait_defer to let many callbacks enqueue */ - poll(NULL,0,100); /* wait for 100ms */ + (void) poll(NULL,0,100); /* wait for 100ms */ rcu_defer_barrier(); } @@ -396,26 +410,33 @@ static void stop_defer_thread(void) int ret; void *tret; - pthread_cancel(tid_defer); + _CMM_STORE_SHARED(defer_thread_stop, 1); + /* Store defer_thread_stop before testing futex */ + cmm_smp_mb(); wake_up_defer(); + ret = pthread_join(tid_defer, &tret); assert(!ret); + + CMM_STORE_SHARED(defer_thread_stop, 0); + /* defer thread should always exit when futex value is 0 */ + assert(uatomic_read(&defer_thread_futex) == 0); } int rcu_defer_register_thread(void) { int was_empty; - assert(defer_queue.last_head == 0); - assert(defer_queue.q == NULL); - defer_queue.q = malloc(sizeof(void *) * DEFER_QUEUE_SIZE); - if (!defer_queue.q) + assert(URCU_TLS(defer_queue).last_head == 0); + assert(URCU_TLS(defer_queue).q == NULL); + URCU_TLS(defer_queue).q = malloc(sizeof(void *) * DEFER_QUEUE_SIZE); + if (!URCU_TLS(defer_queue).q) return -ENOMEM; mutex_lock_defer(&defer_thread_mutex); mutex_lock_defer(&rcu_defer_mutex); was_empty = cds_list_empty(®istry_defer); - cds_list_add(&defer_queue.list, ®istry_defer); + cds_list_add(&URCU_TLS(defer_queue).list, ®istry_defer); mutex_unlock(&rcu_defer_mutex); if (was_empty) @@ -430,10 +451,10 @@ void rcu_defer_unregister_thread(void) mutex_lock_defer(&defer_thread_mutex); mutex_lock_defer(&rcu_defer_mutex); - cds_list_del(&defer_queue.list); + cds_list_del(&URCU_TLS(defer_queue).list); _rcu_defer_barrier_thread(); - free(defer_queue.q); - defer_queue.q = NULL; + free(URCU_TLS(defer_queue).q); + URCU_TLS(defer_queue).q = NULL; is_empty = cds_list_empty(®istry_defer); mutex_unlock(&rcu_defer_mutex);