X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu-call-rcu-impl.h;h=38cc00190f018cf1afd7fac07e42bcedec484af3;hp=cfe1cce43618ea218c241fba4413941825b47b43;hb=765f3eadad5647e6fa853414fc652670f9e00966;hpb=c768e45ed336970a42e58e679804f0f455422cd8 diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h index cfe1cce..38cc001 100644 --- a/urcu-call-rcu-impl.h +++ b/urcu-call-rcu-impl.h @@ -26,11 +26,11 @@ #include #include #include +#include #include #include #include #include -#include #include #include @@ -39,16 +39,15 @@ #include "urcu-call-rcu.h" #include "urcu-pointer.h" #include "urcu/list.h" -#include "urcu/urcu-futex.h" +#include "urcu/futex.h" /* Data structure that identifies a call_rcu thread. */ struct call_rcu_data { struct cds_wfq_queue cbs; unsigned long flags; - pthread_mutex_t mtx; - int futex; - unsigned long qlen; + int32_t futex; + unsigned long qlen; /* maintained for debugging. */ pthread_t tid; int cpu_affinity; struct cds_list_head list; @@ -89,26 +88,6 @@ static struct call_rcu_data *default_call_rcu_data; static struct call_rcu_data **per_cpu_call_rcu_data; static long maxcpus; -static void call_rcu_wait(struct call_rcu_data *crdp) -{ - /* Read call_rcu list before read futex */ - cmm_smp_mb(); - if (uatomic_read(&crdp->futex) == -1) - futex_async(&crdp->futex, FUTEX_WAIT, -1, - NULL, NULL, 0); -} - -static void call_rcu_wake_up(struct call_rcu_data *crdp) -{ - /* Write to call_rcu list before reading/writing futex */ - cmm_smp_mb(); - if (unlikely(uatomic_read(&crdp->futex) == -1)) { - uatomic_set(&crdp->futex, 0); - futex_async(&crdp->futex, FUTEX_WAKE, 1, - NULL, NULL, 0); - } -} - /* Allocate the array if it has not already been allocated. */ static void alloc_cpu_call_rcu_data(void) @@ -136,7 +115,12 @@ static void alloc_cpu_call_rcu_data(void) #else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */ -static const struct call_rcu_data **per_cpu_call_rcu_data = NULL; +/* + * per_cpu_call_rcu_data should be constant, but some functions below, used both + * for cases where cpu number is available and not available, assume it it not + * constant. + */ +static struct call_rcu_data **per_cpu_call_rcu_data = NULL; static const long maxcpus = -1; static void alloc_cpu_call_rcu_data(void) @@ -195,6 +179,26 @@ int set_thread_cpu_affinity(struct call_rcu_data *crdp) } #endif +static void call_rcu_wait(struct call_rcu_data *crdp) +{ + /* Read call_rcu list before read futex */ + cmm_smp_mb(); + if (uatomic_read(&crdp->futex) == -1) + futex_async(&crdp->futex, FUTEX_WAIT, -1, + NULL, NULL, 0); +} + +static void call_rcu_wake_up(struct call_rcu_data *crdp) +{ + /* Write to call_rcu list before reading/writing futex */ + cmm_smp_mb(); + if (unlikely(uatomic_read(&crdp->futex) == -1)) { + uatomic_set(&crdp->futex, 0); + futex_async(&crdp->futex, FUTEX_WAKE, 1, + NULL, NULL, 0); + } +} + /* This is the code run by each call_rcu thread. */ static void *call_rcu_thread(void *arg) @@ -204,19 +208,25 @@ static void *call_rcu_thread(void *arg) struct cds_wfq_node **cbs_tail; struct call_rcu_data *crdp = (struct call_rcu_data *)arg; struct rcu_head *rhp; + int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT); if (set_thread_cpu_affinity(crdp) != 0) { perror("pthread_setaffinity_np"); exit(-1); } + /* + * If callbacks take a read-side lock, we need to be registered. + */ + rcu_register_thread(); + thread_call_rcu_data = crdp; + if (!rt) { + uatomic_dec(&crdp->futex); + /* Decrement futex before reading call_rcu list */ + cmm_smp_mb(); + } for (;;) { - if (!(crdp->flags & URCU_CALL_RCU_RT)) { - uatomic_dec(&crdp->futex); - /* Decrement futex before reading call_rcu list */ - cmm_smp_mb(); - } if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) poll(NULL, 0, 1); @@ -240,25 +250,37 @@ static void *call_rcu_thread(void *arg) } while (cbs != NULL); uatomic_sub(&crdp->qlen, cbcount); } - if (crdp->flags & URCU_CALL_RCU_STOP) { - if (!(crdp->flags & URCU_CALL_RCU_RT)) { + if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP) + break; + rcu_thread_offline(); + if (!rt) { + if (&crdp->cbs.head + == _CMM_LOAD_SHARED(crdp->cbs.tail)) { + call_rcu_wait(crdp); + poll(NULL, 0, 10); + uatomic_dec(&crdp->futex); /* - * Read call_rcu list before write futex. + * Decrement futex before reading + * call_rcu list. */ cmm_smp_mb(); - uatomic_set(&crdp->futex, 0); + } else { + poll(NULL, 0, 10); } - break; - } - if (!(crdp->flags & URCU_CALL_RCU_RT)) { - if (&crdp->cbs.head == _CMM_LOAD_SHARED(crdp->cbs.tail)) - call_rcu_wait(crdp); + } else { + poll(NULL, 0, 10); } - poll(NULL, 0, 10); + rcu_thread_online(); } - call_rcu_lock(&crdp->mtx); - crdp->flags |= URCU_CALL_RCU_STOPPED; - call_rcu_unlock(&crdp->mtx); + if (!rt) { + /* + * Read call_rcu list before write futex. + */ + cmm_smp_mb(); + uatomic_set(&crdp->futex, 0); + } + uatomic_or(&crdp->flags, URCU_CALL_RCU_STOPPED); + rcu_unregister_thread(); return NULL; } @@ -282,10 +304,6 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp, memset(crdp, '\0', sizeof(*crdp)); cds_wfq_init(&crdp->cbs); crdp->qlen = 0; - if (pthread_mutex_init(&crdp->mtx, NULL) != 0) { - perror("pthread_mutex_init"); - exit(-1); - } crdp->futex = 0; crdp->flags = flags; cds_list_add(&crdp->list, &call_rcu_data_list); @@ -568,12 +586,10 @@ void call_rcu_data_free(struct call_rcu_data *crdp) if (crdp == NULL || crdp == default_call_rcu_data) { return; } - if ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) { - call_rcu_lock(&crdp->mtx); - crdp->flags |= URCU_CALL_RCU_STOP; - call_rcu_unlock(&crdp->mtx); + if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) { + uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP); wake_call_rcu_thread(crdp); - while ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) + while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) poll(NULL, 0, 1); } if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { @@ -657,7 +673,7 @@ void call_rcu_after_fork_child(void) if (crdp == default_call_rcu_data) crdp = cds_list_entry(crdp->list.prev, struct call_rcu_data, list); - crdp->flags = URCU_CALL_RCU_STOPPED; + uatomic_set(&crdp->flags, URCU_CALL_RCU_STOPPED); call_rcu_data_free(crdp); } }