X-Git-Url: https://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu-call-rcu-impl.h;h=ae93468d8beb4e9fb9167a4dcc5b462892f3d9a2;hp=9fa6aa663e157a226bd0d28cc05566b5b4dcc6bf;hb=53a55535da7db6ac6a2f37107baed9130c049333;hpb=5e77fc1f94c3572fca067d37667d74f8165e2434 diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h index 9fa6aa6..ae93468 100644 --- a/urcu-call-rcu-impl.h +++ b/urcu-call-rcu-impl.h @@ -20,33 +20,36 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#define _GNU_SOURCE #include #include #include #include #include +#include #include #include #include #include -#include #include +#include #include "config.h" #include "urcu/wfqueue.h" #include "urcu-call-rcu.h" #include "urcu-pointer.h" #include "urcu/list.h" +#include "urcu/futex.h" /* Data structure that identifies a call_rcu thread. */ struct call_rcu_data { struct cds_wfq_queue cbs; unsigned long flags; - pthread_mutex_t mtx; - pthread_cond_t cond; - unsigned long qlen; + int32_t futex; + unsigned long qlen; /* maintained for debugging. */ pthread_t tid; + int cpu_affinity; struct cds_list_head list; } __attribute__((aligned(CAA_CACHE_LINE_SIZE))); @@ -112,7 +115,12 @@ static void alloc_cpu_call_rcu_data(void) #else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */ -static const struct call_rcu_data **per_cpu_call_rcu_data = NULL; +/* + * per_cpu_call_rcu_data should be constant, but some functions below, used both + * for cases where cpu number is available and not available, assume it it not + * constant. + */ +static struct call_rcu_data **per_cpu_call_rcu_data = NULL; static const long maxcpus = -1; static void alloc_cpu_call_rcu_data(void) @@ -146,6 +154,51 @@ static void call_rcu_unlock(pthread_mutex_t *pmp) } } +#if HAVE_SCHED_SETAFFINITY +static +int set_thread_cpu_affinity(struct call_rcu_data *crdp) +{ + cpu_set_t mask; + + if (crdp->cpu_affinity < 0) + return 0; + + CPU_ZERO(&mask); + CPU_SET(crdp->cpu_affinity, &mask); +#if SCHED_SETAFFINITY_ARGS == 2 + return sched_setaffinity(0, &mask); +#else + return sched_setaffinity(0, sizeof(mask), &mask); +#endif +} +#else +static +int set_thread_cpu_affinity(struct call_rcu_data *crdp) +{ + return 0; +} +#endif + +static void call_rcu_wait(struct call_rcu_data *crdp) +{ + /* Read call_rcu list before read futex */ + cmm_smp_mb(); + if (uatomic_read(&crdp->futex) == -1) + futex_async(&crdp->futex, FUTEX_WAIT, -1, + NULL, NULL, 0); +} + +static void call_rcu_wake_up(struct call_rcu_data *crdp) +{ + /* Write to call_rcu list before reading/writing futex */ + cmm_smp_mb(); + if (unlikely(uatomic_read(&crdp->futex) == -1)) { + uatomic_set(&crdp->futex, 0); + futex_async(&crdp->futex, FUTEX_WAKE, 1, + NULL, NULL, 0); + } +} + /* This is the code run by each call_rcu thread. */ static void *call_rcu_thread(void *arg) @@ -155,8 +208,24 @@ static void *call_rcu_thread(void *arg) struct cds_wfq_node **cbs_tail; struct call_rcu_data *crdp = (struct call_rcu_data *)arg; struct rcu_head *rhp; + int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT); + + if (set_thread_cpu_affinity(crdp) != 0) { + perror("pthread_setaffinity_np"); + exit(-1); + } + + /* + * If callbacks take a read-side lock, we need to be registered. + */ + rcu_register_thread(); thread_call_rcu_data = crdp; + if (!rt) { + uatomic_dec(&crdp->futex); + /* Decrement futex before reading call_rcu list */ + cmm_smp_mb(); + } for (;;) { if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) @@ -181,29 +250,37 @@ static void *call_rcu_thread(void *arg) } while (cbs != NULL); uatomic_sub(&crdp->qlen, cbcount); } - if (crdp->flags & URCU_CALL_RCU_STOP) + if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP) break; - if (crdp->flags & URCU_CALL_RCU_RT) - poll(NULL, 0, 10); - else { - call_rcu_lock(&crdp->mtx); - _CMM_STORE_SHARED(crdp->flags, - crdp->flags & ~URCU_CALL_RCU_RUNNING); - if (&crdp->cbs.head == - _CMM_LOAD_SHARED(crdp->cbs.tail) && - pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) { - perror("pthread_cond_wait"); - exit(-1); + rcu_thread_offline(); + if (!rt) { + if (&crdp->cbs.head + == _CMM_LOAD_SHARED(crdp->cbs.tail)) { + call_rcu_wait(crdp); + poll(NULL, 0, 10); + uatomic_dec(&crdp->futex); + /* + * Decrement futex before reading + * call_rcu list. + */ + cmm_smp_mb(); + } else { + poll(NULL, 0, 10); } - _CMM_STORE_SHARED(crdp->flags, - crdp->flags | URCU_CALL_RCU_RUNNING); + } else { poll(NULL, 0, 10); - call_rcu_unlock(&crdp->mtx); } + rcu_thread_online(); + } + if (!rt) { + /* + * Read call_rcu list before write futex. + */ + cmm_smp_mb(); + uatomic_set(&crdp->futex, 0); } - call_rcu_lock(&crdp->mtx); - crdp->flags |= URCU_CALL_RCU_STOPPED; - call_rcu_unlock(&crdp->mtx); + uatomic_or(&crdp->flags, URCU_CALL_RCU_STOPPED); + rcu_unregister_thread(); return NULL; } @@ -214,7 +291,8 @@ static void *call_rcu_thread(void *arg) */ static void call_rcu_data_init(struct call_rcu_data **crdpp, - unsigned long flags) + unsigned long flags, + int cpu_affinity) { struct call_rcu_data *crdp; @@ -226,16 +304,10 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp, memset(crdp, '\0', sizeof(*crdp)); cds_wfq_init(&crdp->cbs); crdp->qlen = 0; - if (pthread_mutex_init(&crdp->mtx, NULL) != 0) { - perror("pthread_mutex_init"); - exit(-1); - } - if (pthread_cond_init(&crdp->cond, NULL) != 0) { - perror("pthread_cond_init"); - exit(-1); - } - crdp->flags = flags | URCU_CALL_RCU_RUNNING; + crdp->futex = 0; + crdp->flags = flags; cds_list_add(&crdp->list, &call_rcu_data_list); + crdp->cpu_affinity = cpu_affinity; cmm_smp_mb(); /* Structure initialized before pointer is planted. */ *crdpp = crdp; if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) { @@ -280,20 +352,22 @@ pthread_t get_call_rcu_thread(struct call_rcu_data *crdp) * Create a call_rcu_data structure (with thread) and return a pointer. */ -static struct call_rcu_data *__create_call_rcu_data(unsigned long flags) +static struct call_rcu_data *__create_call_rcu_data(unsigned long flags, + int cpu_affinity) { struct call_rcu_data *crdp; - call_rcu_data_init(&crdp, flags); + call_rcu_data_init(&crdp, flags, cpu_affinity); return crdp; } -struct call_rcu_data *create_call_rcu_data(unsigned long flags) +struct call_rcu_data *create_call_rcu_data(unsigned long flags, + int cpu_affinity) { struct call_rcu_data *crdp; call_rcu_lock(&call_rcu_mutex); - crdp = __create_call_rcu_data(flags); + crdp = __create_call_rcu_data(flags, cpu_affinity); call_rcu_unlock(&call_rcu_mutex); return crdp; } @@ -309,9 +383,10 @@ struct call_rcu_data *create_call_rcu_data(unsigned long flags) int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp) { - int warned = 0; + static int warned = 0; call_rcu_lock(&call_rcu_mutex); + alloc_cpu_call_rcu_data(); if (cpu < 0 || maxcpus <= cpu) { if (!warned) { fprintf(stderr, "[error] liburcu: set CPU # out of range\n"); @@ -321,13 +396,21 @@ int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp) errno = EINVAL; return -EINVAL; } - alloc_cpu_call_rcu_data(); - call_rcu_unlock(&call_rcu_mutex); + if (per_cpu_call_rcu_data == NULL) { + call_rcu_unlock(&call_rcu_mutex); errno = ENOMEM; return -ENOMEM; } + + if (per_cpu_call_rcu_data[cpu] != NULL && crdp != NULL) { + call_rcu_unlock(&call_rcu_mutex); + errno = EEXIST; + return -EEXIST; + } + per_cpu_call_rcu_data[cpu] = crdp; + call_rcu_unlock(&call_rcu_mutex); return 0; } @@ -346,7 +429,7 @@ struct call_rcu_data *get_default_call_rcu_data(void) call_rcu_unlock(&call_rcu_mutex); return default_call_rcu_data; } - call_rcu_data_init(&default_call_rcu_data, 0); + call_rcu_data_init(&default_call_rcu_data, 0, -1); call_rcu_unlock(&call_rcu_mutex); return default_call_rcu_data; } @@ -361,22 +444,17 @@ struct call_rcu_data *get_default_call_rcu_data(void) */ struct call_rcu_data *get_call_rcu_data(void) { - int curcpu; - static int warned = 0; + struct call_rcu_data *crd; if (thread_call_rcu_data != NULL) return thread_call_rcu_data; - if (maxcpus <= 0) - return get_default_call_rcu_data(); - curcpu = sched_getcpu(); - if (!warned && (curcpu < 0 || maxcpus <= curcpu)) { - fprintf(stderr, "[error] liburcu: gcrd CPU # out of range\n"); - warned = 1; + + if (maxcpus > 0) { + crd = get_cpu_call_rcu_data(sched_getcpu()); + if (crd) + return crd; } - if (curcpu >= 0 && maxcpus > curcpu && - per_cpu_call_rcu_data != NULL && - per_cpu_call_rcu_data[curcpu] != NULL) - return per_cpu_call_rcu_data[curcpu]; + return get_default_call_rcu_data(); } @@ -408,7 +486,9 @@ void set_thread_call_rcu_data(struct call_rcu_data *crdp) /* * Create a separate call_rcu thread for each CPU. This does not * replace a pre-existing call_rcu thread -- use the set_cpu_call_rcu_data() - * function if you want that behavior. + * function if you want that behavior. Should be paired with + * free_all_cpu_call_rcu_data() to teardown these call_rcu worker + * threads. */ int create_all_cpu_call_rcu_data(unsigned long flags) @@ -434,7 +514,7 @@ int create_all_cpu_call_rcu_data(unsigned long flags) call_rcu_unlock(&call_rcu_mutex); continue; } - crdp = __create_call_rcu_data(flags); + crdp = __create_call_rcu_data(flags, i); if (crdp == NULL) { call_rcu_unlock(&call_rcu_mutex); errno = ENOMEM; @@ -455,16 +535,8 @@ int create_all_cpu_call_rcu_data(unsigned long flags) */ static void wake_call_rcu_thread(struct call_rcu_data *crdp) { - if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) { - call_rcu_lock(&crdp->mtx); - if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) { - if (pthread_cond_signal(&crdp->cond) != 0) { - perror("pthread_cond_signal"); - exit(-1); - } - } - call_rcu_unlock(&crdp->mtx); - } + if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) + call_rcu_wake_up(crdp); } /* @@ -520,12 +592,10 @@ void call_rcu_data_free(struct call_rcu_data *crdp) if (crdp == NULL || crdp == default_call_rcu_data) { return; } - if ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) { - call_rcu_lock(&crdp->mtx); - crdp->flags |= URCU_CALL_RCU_STOP; - call_rcu_unlock(&crdp->mtx); + if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) { + uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP); wake_call_rcu_thread(crdp); - while ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) + while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) poll(NULL, 0, 1); } if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { @@ -539,9 +609,11 @@ void call_rcu_data_free(struct call_rcu_data *crdp) *cbs_endprev = cbs; uatomic_add(&default_call_rcu_data->qlen, uatomic_read(&crdp->qlen)); - cds_list_del(&crdp->list); - free(crdp); + wake_call_rcu_thread(default_call_rcu_data); } + + cds_list_del(&crdp->list); + free(crdp); } /* @@ -590,7 +662,7 @@ void call_rcu_after_fork_parent(void) */ void call_rcu_after_fork_child(void) { - struct call_rcu_data *crdp; + struct call_rcu_data *crdp, *next; /* Release the mutex. */ call_rcu_unlock(&call_rcu_mutex); @@ -603,13 +675,10 @@ void call_rcu_after_fork_child(void) (void)get_default_call_rcu_data(); /* Dispose of all of the rest of the call_rcu_data structures. */ - while (call_rcu_data_list.next != call_rcu_data_list.prev) { - crdp = cds_list_entry(call_rcu_data_list.prev, - struct call_rcu_data, list); + cds_list_for_each_entry_safe(crdp, next, &call_rcu_data_list, list) { if (crdp == default_call_rcu_data) - crdp = cds_list_entry(crdp->list.prev, - struct call_rcu_data, list); - crdp->flags = URCU_CALL_RCU_STOPPED; + continue; + uatomic_set(&crdp->flags, URCU_CALL_RCU_STOPPED); call_rcu_data_free(crdp); } }