X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Furcu-call-rcu-impl.h;h=15c1ae60a0eb035e8479a1c1e805e28366f9ec6b;hb=457eeeee4e2694bdd284a186405088f5ccdc6fa9;hp=bfa53f8e09f920a833559ba7b334a04bf3cfcf47;hpb=6893800a4d1cc14dff0395ddcd660a5138db183d;p=urcu.git diff --git a/src/urcu-call-rcu-impl.h b/src/urcu-call-rcu-impl.h index bfa53f8..15c1ae6 100644 --- a/src/urcu-call-rcu-impl.h +++ b/src/urcu-call-rcu-impl.h @@ -35,14 +35,16 @@ #include #include "compat-getcpu.h" -#include "urcu/wfcqueue.h" -#include "urcu-call-rcu.h" -#include "urcu-pointer.h" -#include "urcu/list.h" -#include "urcu/futex.h" -#include "urcu/tls-compat.h" -#include "urcu/ref.h" +#include +#include +#include +#include +#include +#include +#include #include "urcu-die.h" +#include "urcu-utils.h" +#include "compat-smp.h" #define SET_AFFINITY_CHECK_PERIOD (1U << 8) /* 256 */ #define SET_AFFINITY_CHECK_PERIOD_MASK (SET_AFFINITY_CHECK_PERIOD - 1) @@ -79,6 +81,10 @@ struct call_rcu_completion_work { struct call_rcu_completion *completion; }; +enum crdf_flags { + CRDF_FLAG_JOIN_THREAD = (1 << 0), +}; + /* * List of all call_rcu_data structures to keep valgrind happy. * Protected by call_rcu_mutex. @@ -99,6 +105,8 @@ static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER; static struct call_rcu_data *default_call_rcu_data; +static struct urcu_atfork *registered_rculfhash_atfork; + /* * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are * available, then we can have call_rcu threads assigned to individual @@ -116,11 +124,11 @@ static struct call_rcu_data *default_call_rcu_data; */ static struct call_rcu_data **per_cpu_call_rcu_data; -static long maxcpus; +static long cpus_array_len; -static void maxcpus_reset(void) +static void cpus_array_len_reset(void) { - maxcpus = 0; + cpus_array_len = 0; } /* Allocate the array if it has not already been allocated. */ @@ -130,15 +138,15 @@ static void alloc_cpu_call_rcu_data(void) struct call_rcu_data **p; static int warned = 0; - if (maxcpus != 0) + if (cpus_array_len != 0) return; - maxcpus = sysconf(_SC_NPROCESSORS_CONF); - if (maxcpus <= 0) { + cpus_array_len = get_possible_cpus_array_len(); + if (cpus_array_len <= 0) { return; } - p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data)); + p = malloc(cpus_array_len * sizeof(*per_cpu_call_rcu_data)); if (p != NULL) { - memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data)); + memset(p, '\0', cpus_array_len * sizeof(*per_cpu_call_rcu_data)); rcu_set_pointer(&per_cpu_call_rcu_data, p); } else { if (!warned) { @@ -156,9 +164,9 @@ static void alloc_cpu_call_rcu_data(void) * constant. */ static struct call_rcu_data **per_cpu_call_rcu_data = NULL; -static const long maxcpus = -1; +static const long cpus_array_len = -1; -static void maxcpus_reset(void) +static void cpus_array_len_reset(void) { } @@ -195,7 +203,7 @@ static void call_rcu_unlock(pthread_mutex_t *pmp) * Losing affinity can be caused by CPU hotunplug/hotplug, or by * cpuset(7). */ -#if HAVE_SCHED_SETAFFINITY +#ifdef HAVE_SCHED_SETAFFINITY static int set_thread_cpu_affinity(struct call_rcu_data *crdp) { @@ -229,7 +237,7 @@ int set_thread_cpu_affinity(struct call_rcu_data *crdp) } #else static -int set_thread_cpu_affinity(struct call_rcu_data *crdp) +int set_thread_cpu_affinity(struct call_rcu_data *crdp __attribute__((unused))) { return 0; } @@ -239,17 +247,25 @@ static void call_rcu_wait(struct call_rcu_data *crdp) { /* Read call_rcu list before read futex */ cmm_smp_mb(); - if (uatomic_read(&crdp->futex) != -1) - return; - while (futex_async(&crdp->futex, FUTEX_WAIT, -1, - NULL, NULL, 0)) { + while (uatomic_read(&crdp->futex) == -1) { + if (!futex_async(&crdp->futex, FUTEX_WAIT, -1, NULL, NULL, 0)) { + /* + * Prior queued wakeups queued by unrelated code + * using the same address can cause futex wait to + * return 0 even through the futex value is still + * -1 (spurious wakeups). Check the value again + * in user-space to validate whether it really + * differs from -1. + */ + continue; + } switch (errno) { - case EWOULDBLOCK: + case EAGAIN: /* Value already changed. */ return; case EINTR: /* Retry if interrupted by signal. */ - break; /* Get out of switch. */ + break; /* Get out of switch. Check again. */ default: /* Unexpected error. */ urcu_die(errno); @@ -273,17 +289,25 @@ static void call_rcu_completion_wait(struct call_rcu_completion *completion) { /* Read completion barrier count before read futex */ cmm_smp_mb(); - if (uatomic_read(&completion->futex) != -1) - return; - while (futex_async(&completion->futex, FUTEX_WAIT, -1, - NULL, NULL, 0)) { + while (uatomic_read(&completion->futex) == -1) { + if (!futex_async(&completion->futex, FUTEX_WAIT, -1, NULL, NULL, 0)) { + /* + * Prior queued wakeups queued by unrelated code + * using the same address can cause futex wait to + * return 0 even through the futex value is still + * -1 (spurious wakeups). Check the value again + * in user-space to validate whether it really + * differs from -1. + */ + continue; + } switch (errno) { - case EWOULDBLOCK: + case EAGAIN: /* Value already changed. */ return; case EINTR: /* Retry if interrupted by signal. */ - break; /* Get out of switch. */ + break; /* Get out of switch. Check again. */ default: /* Unexpected error. */ urcu_die(errno); @@ -428,8 +452,8 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp, cds_list_add(&crdp->list, &call_rcu_data_list); crdp->cpu_affinity = cpu_affinity; crdp->gp_count = 0; - cmm_smp_mb(); /* Structure initialized before pointer is planted. */ - *crdpp = crdp; + rcu_set_pointer(crdpp, crdp); + ret = pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp); if (ret) urcu_die(ret); @@ -453,14 +477,16 @@ struct call_rcu_data *get_cpu_call_rcu_data(int cpu) pcpu_crdp = rcu_dereference(per_cpu_call_rcu_data); if (pcpu_crdp == NULL) return NULL; - if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) { + if (!warned && cpus_array_len > 0 && (cpu < 0 || cpus_array_len <= cpu)) { fprintf(stderr, "[error] liburcu: get CPU # out of range\n"); warned = 1; } - if (cpu < 0 || maxcpus <= cpu) + if (cpu < 0 || cpus_array_len <= cpu) return NULL; return rcu_dereference(pcpu_crdp[cpu]); } +URCU_ATTR_ALIAS(urcu_stringify(get_cpu_call_rcu_data)) +struct call_rcu_data *alias_get_cpu_call_rcu_data(); /* * Return the tid corresponding to the call_rcu thread whose @@ -471,6 +497,8 @@ pthread_t get_call_rcu_thread(struct call_rcu_data *crdp) { return crdp->tid; } +URCU_ATTR_ALIAS(urcu_stringify(get_call_rcu_thread)) +pthread_t alias_get_call_rcu_thread(); /* * Create a call_rcu_data structure (with thread) and return a pointer. @@ -485,6 +513,8 @@ static struct call_rcu_data *__create_call_rcu_data(unsigned long flags, return crdp; } +URCU_ATTR_ALIAS(urcu_stringify(create_call_rcu_data)) +struct call_rcu_data *alias_create_call_rcu_data(); struct call_rcu_data *create_call_rcu_data(unsigned long flags, int cpu_affinity) { @@ -515,7 +545,7 @@ int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp) call_rcu_lock(&call_rcu_mutex); alloc_cpu_call_rcu_data(); - if (cpu < 0 || maxcpus <= cpu) { + if (cpu < 0 || cpus_array_len <= cpu) { if (!warned) { fprintf(stderr, "[error] liburcu: set CPU # out of range\n"); warned = 1; @@ -541,26 +571,35 @@ int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp) call_rcu_unlock(&call_rcu_mutex); return 0; } +URCU_ATTR_ALIAS(urcu_stringify(set_cpu_call_rcu_data)) +int alias_set_cpu_call_rcu_data(); /* * Return a pointer to the default call_rcu_data structure, creating - * one if need be. Because we never free call_rcu_data structures, - * we don't need to be in an RCU read-side critical section. + * one if need be. + * + * The call to this function with intent to use the returned + * call_rcu_data should be protected by RCU read-side lock. */ struct call_rcu_data *get_default_call_rcu_data(void) { - if (default_call_rcu_data != NULL) - return rcu_dereference(default_call_rcu_data); + struct call_rcu_data *crdp; + + crdp = rcu_dereference(default_call_rcu_data); + if (crdp != NULL) + return crdp; + call_rcu_lock(&call_rcu_mutex); - if (default_call_rcu_data != NULL) { - call_rcu_unlock(&call_rcu_mutex); - return default_call_rcu_data; - } - call_rcu_data_init(&default_call_rcu_data, 0, -1); + if (default_call_rcu_data == NULL) + call_rcu_data_init(&default_call_rcu_data, 0, -1); + crdp = default_call_rcu_data; call_rcu_unlock(&call_rcu_mutex); - return default_call_rcu_data; + + return crdp; } +URCU_ATTR_ALIAS(urcu_stringify(get_default_call_rcu_data)) +struct call_rcu_data *alias_get_default_call_rcu_data(); /* * Return the call_rcu_data structure that applies to the currently @@ -580,7 +619,7 @@ struct call_rcu_data *get_call_rcu_data(void) if (URCU_TLS(thread_call_rcu_data) != NULL) return URCU_TLS(thread_call_rcu_data); - if (maxcpus > 0) { + if (cpus_array_len > 0) { crd = get_cpu_call_rcu_data(urcu_sched_getcpu()); if (crd) return crd; @@ -588,6 +627,8 @@ struct call_rcu_data *get_call_rcu_data(void) return get_default_call_rcu_data(); } +URCU_ATTR_ALIAS(urcu_stringify(get_call_rcu_data)) +struct call_rcu_data *alias_get_call_rcu_data(); /* * Return a pointer to this task's call_rcu_data if there is one. @@ -597,6 +638,8 @@ struct call_rcu_data *get_thread_call_rcu_data(void) { return URCU_TLS(thread_call_rcu_data); } +URCU_ATTR_ALIAS(urcu_stringify(get_thread_call_rcu_data)) +struct call_rcu_data *alias_get_thread_call_rcu_data(); /* * Set this task's call_rcu_data structure as specified, regardless @@ -613,6 +656,8 @@ void set_thread_call_rcu_data(struct call_rcu_data *crdp) { URCU_TLS(thread_call_rcu_data) = crdp; } +URCU_ATTR_ALIAS(urcu_stringify(set_thread_call_rcu_data)) +void alias_set_thread_call_rcu_data(); /* * Create a separate call_rcu thread for each CPU. This does not @@ -631,7 +676,7 @@ int create_all_cpu_call_rcu_data(unsigned long flags) call_rcu_lock(&call_rcu_mutex); alloc_cpu_call_rcu_data(); call_rcu_unlock(&call_rcu_mutex); - if (maxcpus <= 0) { + if (cpus_array_len <= 0) { errno = EINVAL; return -EINVAL; } @@ -639,7 +684,7 @@ int create_all_cpu_call_rcu_data(unsigned long flags) errno = ENOMEM; return -ENOMEM; } - for (i = 0; i < maxcpus; i++) { + for (i = 0; i < cpus_array_len; i++) { call_rcu_lock(&call_rcu_mutex); if (get_cpu_call_rcu_data(i)) { call_rcu_unlock(&call_rcu_mutex); @@ -664,6 +709,8 @@ int create_all_cpu_call_rcu_data(unsigned long flags) } return 0; } +URCU_ATTR_ALIAS(urcu_stringify(create_all_cpu_call_rcu_data)) +int alias_create_all_cpu_call_rcu_data(); /* * Wake up the call_rcu thread corresponding to the specified @@ -711,6 +758,7 @@ void call_rcu(struct rcu_head *head, _call_rcu(head, func, crdp); _rcu_read_unlock(); } +URCU_ATTR_ALIAS(urcu_stringify(call_rcu)) void alias_call_rcu(); /* * Free up the specified call_rcu_data structure, terminating the @@ -738,7 +786,8 @@ void call_rcu(struct rcu_head *head, * a list corruption bug in the 0.7.x series. The equivalent fix * appeared in 0.6.8 for the stable-0.6 branch. */ -void call_rcu_data_free(struct call_rcu_data *crdp) +static +void _call_rcu_data_free(struct call_rcu_data *crdp, unsigned int flags) { if (crdp == NULL || crdp == default_call_rcu_data) { return; @@ -749,9 +798,13 @@ void call_rcu_data_free(struct call_rcu_data *crdp) while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) (void) poll(NULL, 0, 1); } + call_rcu_lock(&call_rcu_mutex); if (!cds_wfcq_empty(&crdp->cbs_head, &crdp->cbs_tail)) { - /* Create default call rcu data if need be */ + call_rcu_unlock(&call_rcu_mutex); + /* Create default call rcu data if need be. */ + /* CBs queued here will be handed to the default list. */ (void) get_default_call_rcu_data(); + call_rcu_lock(&call_rcu_mutex); __cds_wfcq_splice_blocking(&default_call_rcu_data->cbs_head, &default_call_rcu_data->cbs_tail, &crdp->cbs_head, &crdp->cbs_tail); @@ -760,12 +813,25 @@ void call_rcu_data_free(struct call_rcu_data *crdp) wake_call_rcu_thread(default_call_rcu_data); } - call_rcu_lock(&call_rcu_mutex); cds_list_del(&crdp->list); call_rcu_unlock(&call_rcu_mutex); + if (flags & CRDF_FLAG_JOIN_THREAD) { + int ret; + + ret = pthread_join(get_call_rcu_thread(crdp), NULL); + if (ret) + urcu_die(ret); + } free(crdp); } +URCU_ATTR_ALIAS(urcu_stringify(call_rcu_data_free)) +void alias_call_rcu_data_free(); + +void call_rcu_data_free(struct call_rcu_data *crdp) +{ + _call_rcu_data_free(crdp, CRDF_FLAG_JOIN_THREAD); +} /* * Clean up all the per-CPU call_rcu threads. @@ -776,10 +842,10 @@ void free_all_cpu_call_rcu_data(void) struct call_rcu_data **crdp; static int warned = 0; - if (maxcpus <= 0) + if (cpus_array_len <= 0) return; - crdp = malloc(sizeof(*crdp) * maxcpus); + crdp = malloc(sizeof(*crdp) * cpus_array_len); if (!crdp) { if (!warned) { fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n"); @@ -788,7 +854,7 @@ void free_all_cpu_call_rcu_data(void) return; } - for (cpu = 0; cpu < maxcpus; cpu++) { + for (cpu = 0; cpu < cpus_array_len; cpu++) { crdp[cpu] = get_cpu_call_rcu_data(cpu); if (crdp[cpu] == NULL) continue; @@ -799,13 +865,23 @@ void free_all_cpu_call_rcu_data(void) * call_rcu_data to become quiescent. */ synchronize_rcu(); - for (cpu = 0; cpu < maxcpus; cpu++) { + for (cpu = 0; cpu < cpus_array_len; cpu++) { if (crdp[cpu] == NULL) continue; call_rcu_data_free(crdp[cpu]); } free(crdp); } +#ifdef RCU_QSBR +/* ABI6 has a non-namespaced free_all_cpu_call_rcu_data for qsbr */ +#undef free_all_cpu_call_rcu_data +URCU_ATTR_ALIAS("urcu_qsbr_free_all_cpu_call_rcu_data") +void free_all_cpu_call_rcu_data(); +#define free_all_cpu_call_rcu_data urcu_qsbr_free_all_cpu_call_rcu_data +#else +URCU_ATTR_ALIAS(urcu_stringify(free_all_cpu_call_rcu_data)) +void alias_free_all_cpu_call_rcu_data(); +#endif static void free_completion(struct urcu_ref *ref) @@ -897,6 +973,8 @@ online: if (was_online) rcu_thread_online(); } +URCU_ATTR_ALIAS(urcu_stringify(rcu_barrier)) +void alias_rcu_barrier(); /* * Acquire the call_rcu_mutex in order to ensure that the child sees @@ -907,9 +985,14 @@ online: void call_rcu_before_fork(void) { struct call_rcu_data *crdp; + struct urcu_atfork *atfork; call_rcu_lock(&call_rcu_mutex); + atfork = registered_rculfhash_atfork; + if (atfork) + atfork->before_fork(atfork->priv); + cds_list_for_each_entry(crdp, &call_rcu_data_list, list) { uatomic_or(&crdp->flags, URCU_CALL_RCU_PAUSE); cmm_smp_mb__after_uatomic_or(); @@ -920,6 +1003,8 @@ void call_rcu_before_fork(void) (void) poll(NULL, 0, 1); } } +URCU_ATTR_ALIAS(urcu_stringify(call_rcu_before_fork)) +void alias_call_rcu_before_fork(); /* * Clean up call_rcu data structures in the parent of a successful fork() @@ -929,6 +1014,7 @@ void call_rcu_before_fork(void) void call_rcu_after_fork_parent(void) { struct call_rcu_data *crdp; + struct urcu_atfork *atfork; cds_list_for_each_entry(crdp, &call_rcu_data_list, list) uatomic_and(&crdp->flags, ~URCU_CALL_RCU_PAUSE); @@ -936,8 +1022,13 @@ void call_rcu_after_fork_parent(void) while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_PAUSED) != 0) (void) poll(NULL, 0, 1); } + atfork = registered_rculfhash_atfork; + if (atfork) + atfork->after_fork_parent(atfork->priv); call_rcu_unlock(&call_rcu_mutex); } +URCU_ATTR_ALIAS(urcu_stringify(call_rcu_after_fork_parent)) +void alias_call_rcu_after_fork_parent(); /* * Clean up call_rcu data structures in the child of a successful fork() @@ -947,10 +1038,15 @@ void call_rcu_after_fork_parent(void) void call_rcu_after_fork_child(void) { struct call_rcu_data *crdp, *next; + struct urcu_atfork *atfork; /* Release the mutex. */ call_rcu_unlock(&call_rcu_mutex); + atfork = registered_rculfhash_atfork; + if (atfork) + atfork->after_fork_child(atfork->priv); + /* Do nothing when call_rcu() has not been used */ if (cds_list_empty(&call_rcu_data_list)) return; @@ -963,7 +1059,7 @@ void call_rcu_after_fork_child(void) (void)get_default_call_rcu_data(); /* Cleanup call_rcu_data pointers before use */ - maxcpus_reset(); + cpus_array_len_reset(); free(per_cpu_call_rcu_data); rcu_set_pointer(&per_cpu_call_rcu_data, NULL); URCU_TLS(thread_call_rcu_data) = NULL; @@ -977,6 +1073,93 @@ void call_rcu_after_fork_child(void) if (crdp == default_call_rcu_data) continue; uatomic_set(&crdp->flags, URCU_CALL_RCU_STOPPED); + /* + * Do not join the thread because it does not exist in + * the child. + */ + _call_rcu_data_free(crdp, 0); + } +} +URCU_ATTR_ALIAS(urcu_stringify(call_rcu_after_fork_child)) +void alias_call_rcu_after_fork_child(); + +void urcu_register_rculfhash_atfork(struct urcu_atfork *atfork) +{ + if (CMM_LOAD_SHARED(registered_rculfhash_atfork)) + return; + call_rcu_lock(&call_rcu_mutex); + if (!registered_rculfhash_atfork) + registered_rculfhash_atfork = atfork; + call_rcu_unlock(&call_rcu_mutex); +} +URCU_ATTR_ALIAS(urcu_stringify(urcu_register_rculfhash_atfork)) +void alias_urcu_register_rculfhash_atfork(); + +/* + * This unregistration function is deprecated, meant only for internal + * use by rculfhash. + */ +__attribute__((noreturn)) +void urcu_unregister_rculfhash_atfork(struct urcu_atfork *atfork __attribute__((unused))) +{ + urcu_die(EPERM); +} + +URCU_ATTR_ALIAS(urcu_stringify(urcu_unregister_rculfhash_atfork)) +__attribute__((noreturn)) +void alias_urcu_unregister_rculfhash_atfork(); + +/* + * Teardown the default call_rcu worker thread if there are no queued + * callbacks on process exit. This prevents leaking memory. + * + * Here is how an application can ensure graceful teardown of this + * worker thread: + * + * - An application queuing call_rcu callbacks should invoke + * rcu_barrier() before it exits. + * - When chaining call_rcu callbacks, the number of calls to + * rcu_barrier() on application exit must match at least the maximum + * number of chained callbacks. + * - If an application chains callbacks endlessly, it would have to be + * modified to stop chaining callbacks when it detects an application + * exit (e.g. with a flag), and wait for quiescence with rcu_barrier() + * after setting that flag. + * - The statements above apply to a library which queues call_rcu + * callbacks, only it needs to invoke rcu_barrier in its library + * destructor. + * + * Note that this function does not presume it is being called when the + * application is single-threaded even though this is invoked from a + * destructor: this function synchronizes against concurrent calls to + * get_default_call_rcu_data(). + */ +static void urcu_call_rcu_exit(void) +{ + struct call_rcu_data *crdp; + bool teardown = true; + + if (default_call_rcu_data == NULL) + return; + call_rcu_lock(&call_rcu_mutex); + /* + * If the application leaves callbacks in the default call_rcu + * worker queue, keep the default worker in place. + */ + crdp = default_call_rcu_data; + if (!crdp) { + teardown = false; + goto unlock; + } + if (!cds_wfcq_empty(&crdp->cbs_head, &crdp->cbs_tail)) { + teardown = false; + goto unlock; + } + rcu_set_pointer(&default_call_rcu_data, NULL); +unlock: + call_rcu_unlock(&call_rcu_mutex); + if (teardown) { + synchronize_rcu(); call_rcu_data_free(crdp); } }