X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu-call-rcu-impl.h;h=65f63ee51670ae73b9c8ead9869554e4f8f35a82;hp=f7f0f71eeeb28fec5152a8a81194c141035cd550;hb=7be759d1adf95b168f09e513232ea26de15e1eaf;hpb=63b495d8b20ff76ba08d823df2e9702acc936beb diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h index f7f0f71..65f63ee 100644 --- a/urcu-call-rcu-impl.h +++ b/urcu-call-rcu-impl.h @@ -36,14 +36,19 @@ #include #include "config.h" +#include "compat-getcpu.h" #include "urcu/wfcqueue.h" #include "urcu-call-rcu.h" #include "urcu-pointer.h" #include "urcu/list.h" #include "urcu/futex.h" #include "urcu/tls-compat.h" +#include "urcu/ref.h" #include "urcu-die.h" +#define SET_AFFINITY_CHECK_PERIOD (1U << 8) /* 256 */ +#define SET_AFFINITY_CHECK_PERIOD_MASK (SET_AFFINITY_CHECK_PERIOD - 1) + /* Data structure that identifies a call_rcu thread. */ struct call_rcu_data { @@ -61,9 +66,21 @@ struct call_rcu_data { unsigned long qlen; /* maintained for debugging. */ pthread_t tid; int cpu_affinity; + unsigned long gp_count; struct cds_list_head list; } __attribute__((aligned(CAA_CACHE_LINE_SIZE))); +struct call_rcu_completion { + int barrier_count; + int32_t futex; + struct urcu_ref ref; +}; + +struct call_rcu_completion_work { + struct rcu_head head; + struct call_rcu_completion *completion; +}; + /* * List of all call_rcu_data structures to keep valgrind happy. * Protected by call_rcu_mutex. @@ -90,23 +107,7 @@ static struct call_rcu_data *default_call_rcu_data; * CPUs rather than only to specific threads. */ -#ifdef HAVE_SCHED_GETCPU - -static int urcu_sched_getcpu(void) -{ - return sched_getcpu(); -} - -#else /* #ifdef HAVE_SCHED_GETCPU */ - -static int urcu_sched_getcpu(void) -{ - return -1; -} - -#endif /* #else #ifdef HAVE_SCHED_GETCPU */ - -#if defined(HAVE_SYSCONF) && defined(HAVE_SCHED_GETCPU) +#if defined(HAVE_SYSCONF) && (defined(HAVE_SCHED_GETCPU) || defined(HAVE_GETCPUID)) /* * Pointer to array of pointers to per-CPU call_rcu_data structures @@ -191,22 +192,42 @@ static void call_rcu_unlock(pthread_mutex_t *pmp) urcu_die(ret); } +/* + * Periodically retry setting CPU affinity if we migrate. + * Losing affinity can be caused by CPU hotunplug/hotplug, or by + * cpuset(7). + */ #if HAVE_SCHED_SETAFFINITY static int set_thread_cpu_affinity(struct call_rcu_data *crdp) { cpu_set_t mask; + int ret; if (crdp->cpu_affinity < 0) return 0; + if (++crdp->gp_count & SET_AFFINITY_CHECK_PERIOD_MASK) + return 0; + if (urcu_sched_getcpu() == crdp->cpu_affinity) + return 0; CPU_ZERO(&mask); CPU_SET(crdp->cpu_affinity, &mask); #if SCHED_SETAFFINITY_ARGS == 2 - return sched_setaffinity(0, &mask); + ret = sched_setaffinity(0, &mask); #else - return sched_setaffinity(0, sizeof(mask), &mask); + ret = sched_setaffinity(0, sizeof(mask), &mask); #endif + /* + * EINVAL is fine: can be caused by hotunplugged CPUs, or by + * cpuset(7). This is why we should always retry if we detect + * migration. + */ + if (ret && errno == EINVAL) { + ret = 0; + errno = 0; + } + return ret; } #else static @@ -220,9 +241,22 @@ static void call_rcu_wait(struct call_rcu_data *crdp) { /* Read call_rcu list before read futex */ cmm_smp_mb(); - if (uatomic_read(&crdp->futex) == -1) - futex_async(&crdp->futex, FUTEX_WAIT, -1, - NULL, NULL, 0); + if (uatomic_read(&crdp->futex) != -1) + return; + while (futex_async(&crdp->futex, FUTEX_WAIT, -1, + NULL, NULL, 0)) { + switch (errno) { + case EWOULDBLOCK: + /* Value already changed. */ + return; + case EINTR: + /* Retry if interrupted by signal. */ + break; /* Get out of switch. */ + default: + /* Unexpected error. */ + urcu_die(errno); + } + } } static void call_rcu_wake_up(struct call_rcu_data *crdp) @@ -231,8 +265,43 @@ static void call_rcu_wake_up(struct call_rcu_data *crdp) cmm_smp_mb(); if (caa_unlikely(uatomic_read(&crdp->futex) == -1)) { uatomic_set(&crdp->futex, 0); - futex_async(&crdp->futex, FUTEX_WAKE, 1, - NULL, NULL, 0); + if (futex_async(&crdp->futex, FUTEX_WAKE, 1, + NULL, NULL, 0) < 0) + urcu_die(errno); + } +} + +static void call_rcu_completion_wait(struct call_rcu_completion *completion) +{ + /* Read completion barrier count before read futex */ + cmm_smp_mb(); + if (uatomic_read(&completion->futex) != -1) + return; + while (futex_async(&completion->futex, FUTEX_WAIT, -1, + NULL, NULL, 0)) { + switch (errno) { + case EWOULDBLOCK: + /* Value already changed. */ + return; + case EINTR: + /* Retry if interrupted by signal. */ + break; /* Get out of switch. */ + default: + /* Unexpected error. */ + urcu_die(errno); + } + } +} + +static void call_rcu_completion_wake_up(struct call_rcu_completion *completion) +{ + /* Write to completion barrier count before reading/writing futex */ + cmm_smp_mb(); + if (caa_unlikely(uatomic_read(&completion->futex) == -1)) { + uatomic_set(&completion->futex, 0); + if (futex_async(&completion->futex, FUTEX_WAKE, 1, + NULL, NULL, 0) < 0) + urcu_die(errno); } } @@ -243,10 +312,8 @@ static void *call_rcu_thread(void *arg) unsigned long cbcount; struct call_rcu_data *crdp = (struct call_rcu_data *) arg; int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT); - int ret; - ret = set_thread_cpu_affinity(crdp); - if (ret) + if (set_thread_cpu_affinity(crdp)) urcu_die(errno); /* @@ -266,6 +333,9 @@ static void *call_rcu_thread(void *arg) struct cds_wfcq_node *cbs, *cbs_tmp_n; enum cds_wfcq_ret splice_ret; + if (set_thread_cpu_affinity(crdp)) + urcu_die(errno); + if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_PAUSE) { /* * Pause requested. Become quiescent: remove @@ -277,7 +347,9 @@ static void *call_rcu_thread(void *arg) cmm_smp_mb__before_uatomic_or(); uatomic_or(&crdp->flags, URCU_CALL_RCU_PAUSED); while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_PAUSE) != 0) - poll(NULL, 0, 1); + (void) poll(NULL, 0, 1); + uatomic_and(&crdp->flags, ~URCU_CALL_RCU_PAUSED); + cmm_smp_mb__after_uatomic_and(); rcu_register_thread(); } @@ -307,7 +379,7 @@ static void *call_rcu_thread(void *arg) if (cds_wfcq_empty(&crdp->cbs_head, &crdp->cbs_tail)) { call_rcu_wait(crdp); - poll(NULL, 0, 10); + (void) poll(NULL, 0, 10); uatomic_dec(&crdp->futex); /* * Decrement futex before reading @@ -315,10 +387,10 @@ static void *call_rcu_thread(void *arg) */ cmm_smp_mb(); } else { - poll(NULL, 0, 10); + (void) poll(NULL, 0, 10); } } else { - poll(NULL, 0, 10); + (void) poll(NULL, 0, 10); } rcu_thread_online(); } @@ -357,6 +429,7 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp, crdp->flags = flags; cds_list_add(&crdp->list, &call_rcu_data_list); crdp->cpu_affinity = cpu_affinity; + crdp->gp_count = 0; cmm_smp_mb(); /* Structure initialized before pointer is planted. */ *crdpp = crdp; ret = pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp); @@ -604,6 +677,17 @@ static void wake_call_rcu_thread(struct call_rcu_data *crdp) call_rcu_wake_up(crdp); } +static void _call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head), + struct call_rcu_data *crdp) +{ + cds_wfcq_node_init(&head->next); + head->func = func; + cds_wfcq_enqueue(&crdp->cbs_head, &crdp->cbs_tail, &head->next); + uatomic_inc(&crdp->qlen); + wake_call_rcu_thread(crdp); +} + /* * Schedule a function to be invoked after a following grace period. * This is the only function that must be called -- the others are @@ -618,21 +702,16 @@ static void wake_call_rcu_thread(struct call_rcu_data *crdp) * * call_rcu must be called by registered RCU read-side threads. */ - void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *head)) { struct call_rcu_data *crdp; - cds_wfcq_node_init(&head->next); - head->func = func; /* Holding rcu read-side lock across use of per-cpu crdp */ - rcu_read_lock(); + _rcu_read_lock(); crdp = get_call_rcu_data(); - cds_wfcq_enqueue(&crdp->cbs_head, &crdp->cbs_tail, &head->next); - uatomic_inc(&crdp->qlen); - wake_call_rcu_thread(crdp); - rcu_read_unlock(); + _call_rcu(head, func, crdp); + _rcu_read_unlock(); } /* @@ -670,7 +749,7 @@ void call_rcu_data_free(struct call_rcu_data *crdp) uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP); wake_call_rcu_thread(crdp); while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) - poll(NULL, 0, 1); + (void) poll(NULL, 0, 1); } if (!cds_wfcq_empty(&crdp->cbs_head, &crdp->cbs_tail)) { /* Create default call rcu data if need be */ @@ -730,6 +809,97 @@ void free_all_cpu_call_rcu_data(void) free(crdp); } +static +void free_completion(struct urcu_ref *ref) +{ + struct call_rcu_completion *completion; + + completion = caa_container_of(ref, struct call_rcu_completion, ref); + free(completion); +} + +static +void _rcu_barrier_complete(struct rcu_head *head) +{ + struct call_rcu_completion_work *work; + struct call_rcu_completion *completion; + + work = caa_container_of(head, struct call_rcu_completion_work, head); + completion = work->completion; + if (!uatomic_sub_return(&completion->barrier_count, 1)) + call_rcu_completion_wake_up(completion); + urcu_ref_put(&completion->ref, free_completion); + free(work); +} + +/* + * Wait for all in-flight call_rcu callbacks to complete execution. + */ +void rcu_barrier(void) +{ + struct call_rcu_data *crdp; + struct call_rcu_completion *completion; + int count = 0; + int was_online; + + /* Put in offline state in QSBR. */ + was_online = _rcu_read_ongoing(); + if (was_online) + rcu_thread_offline(); + /* + * Calling a rcu_barrier() within a RCU read-side critical + * section is an error. + */ + if (_rcu_read_ongoing()) { + static int warned = 0; + + if (!warned) { + fprintf(stderr, "[error] liburcu: rcu_barrier() called from within RCU read-side critical section.\n"); + } + warned = 1; + goto online; + } + + completion = calloc(sizeof(*completion), 1); + if (!completion) + urcu_die(errno); + + call_rcu_lock(&call_rcu_mutex); + cds_list_for_each_entry(crdp, &call_rcu_data_list, list) + count++; + + /* Referenced by rcu_barrier() and each call_rcu thread. */ + urcu_ref_set(&completion->ref, count + 1); + completion->barrier_count = count; + + cds_list_for_each_entry(crdp, &call_rcu_data_list, list) { + struct call_rcu_completion_work *work; + + work = calloc(sizeof(*work), 1); + if (!work) + urcu_die(errno); + work->completion = completion; + _call_rcu(&work->head, _rcu_barrier_complete, crdp); + } + call_rcu_unlock(&call_rcu_mutex); + + /* Wait for them */ + for (;;) { + uatomic_dec(&completion->futex); + /* Decrement futex before reading barrier_count */ + cmm_smp_mb(); + if (!uatomic_read(&completion->barrier_count)) + break; + call_rcu_completion_wait(completion); + } + + urcu_ref_put(&completion->ref, free_completion); + +online: + if (was_online) + rcu_thread_online(); +} + /* * Acquire the call_rcu_mutex in order to ensure that the child sees * all of the call_rcu() data structures in a consistent state. Ensure @@ -749,7 +919,7 @@ void call_rcu_before_fork(void) } cds_list_for_each_entry(crdp, &call_rcu_data_list, list) { while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_PAUSED) == 0) - poll(NULL, 0, 1); + (void) poll(NULL, 0, 1); } } @@ -764,6 +934,10 @@ void call_rcu_after_fork_parent(void) cds_list_for_each_entry(crdp, &call_rcu_data_list, list) uatomic_and(&crdp->flags, ~URCU_CALL_RCU_PAUSE); + cds_list_for_each_entry(crdp, &call_rcu_data_list, list) { + while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_PAUSED) != 0) + (void) poll(NULL, 0, 1); + } call_rcu_unlock(&call_rcu_mutex); }