From: Mathieu Desnoyers Date: Thu, 29 Sep 2011 21:14:54 +0000 (-0400) Subject: Merge branch 'master' into urcu/ht-shrink X-Git-Tag: v0.7.0~43^2~110 X-Git-Url: http://git.liburcu.org/?a=commitdiff_plain;h=1f689e13ea7e519b1afc001e9c55a7b1b60b599f;hp=a4ce4f122a4e615b6426fa876df38f5c3ea5fb8a;p=userspace-rcu.git Merge branch 'master' into urcu/ht-shrink --- diff --git a/urcu-bp.c b/urcu-bp.c index 2ae3408..4c0ab54 100644 --- a/urcu-bp.c +++ b/urcu-bp.c @@ -39,6 +39,7 @@ #include "urcu/wfqueue.h" #include "urcu/map/urcu-bp.h" #include "urcu/static/urcu-bp.h" +#include "urcu-pointer.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #undef _LGPL_SOURCE diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h index 3e947af..d09adb1 100644 --- a/urcu-call-rcu-impl.h +++ b/urcu-call-rcu-impl.h @@ -82,12 +82,20 @@ static struct call_rcu_data *default_call_rcu_data; /* * Pointer to array of pointers to per-CPU call_rcu_data structures - * and # CPUs. + * and # CPUs. per_cpu_call_rcu_data is a RCU-protected pointer to an + * array of RCU-protected pointers to call_rcu_data. call_rcu acts as a + * RCU read-side and reads per_cpu_call_rcu_data and the per-cpu pointer + * without mutex. The call_rcu_mutex protects updates. */ static struct call_rcu_data **per_cpu_call_rcu_data; static long maxcpus; +static void maxcpus_reset(void) +{ + maxcpus = 0; +} + /* Allocate the array if it has not already been allocated. */ static void alloc_cpu_call_rcu_data(void) @@ -104,7 +112,7 @@ static void alloc_cpu_call_rcu_data(void) p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data)); if (p != NULL) { memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data)); - per_cpu_call_rcu_data = p; + rcu_set_pointer(&per_cpu_call_rcu_data, p); } else { if (!warned) { fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n"); @@ -123,6 +131,10 @@ static void alloc_cpu_call_rcu_data(void) static struct call_rcu_data **per_cpu_call_rcu_data = NULL; static const long maxcpus = -1; +static void maxcpus_reset(void) +{ +} + static void alloc_cpu_call_rcu_data(void) { } @@ -321,13 +333,18 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp, * CPU, returning NULL if there is none. We cannot automatically * created it because the platform we are running on might not define * sched_getcpu(). + * + * The call to this function and use of the returned call_rcu_data + * should be protected by RCU read-side lock. */ struct call_rcu_data *get_cpu_call_rcu_data(int cpu) { static int warned = 0; + struct call_rcu_data **pcpu_crdp; - if (per_cpu_call_rcu_data == NULL) + pcpu_crdp = rcu_dereference(per_cpu_call_rcu_data); + if (pcpu_crdp == NULL) return NULL; if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) { fprintf(stderr, "[error] liburcu: get CPU # out of range\n"); @@ -335,7 +352,7 @@ struct call_rcu_data *get_cpu_call_rcu_data(int cpu) } if (cpu < 0 || maxcpus <= cpu) return NULL; - return per_cpu_call_rcu_data[cpu]; + return rcu_dereference(pcpu_crdp[cpu]); } /* @@ -409,7 +426,7 @@ int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp) return -EEXIST; } - per_cpu_call_rcu_data[cpu] = crdp; + rcu_set_pointer(&per_cpu_call_rcu_data[cpu], crdp); call_rcu_unlock(&call_rcu_mutex); return 0; } @@ -441,6 +458,9 @@ struct call_rcu_data *get_default_call_rcu_data(void) * structure assigned to the CPU on which the thread is running, * followed by the default call_rcu_data structure. If there is not * yet a default call_rcu_data structure, one will be created. + * + * Calls to this function and use of the returned call_rcu_data should + * be protected by RCU read-side lock. */ struct call_rcu_data *get_call_rcu_data(void) { @@ -555,6 +575,8 @@ static void wake_call_rcu_thread(struct call_rcu_data *crdp) * need the first invocation of call_rcu() to be fast, make sure * to create a call_rcu thread first. One way to accomplish this is * "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data(). + * + * call_rcu must be called by registered RCU read-side threads. */ void call_rcu(struct rcu_head *head, @@ -564,10 +586,13 @@ void call_rcu(struct rcu_head *head, cds_wfq_node_init(&head->next); head->func = func; + /* Holding rcu read-side lock across use of per-cpu crdp */ + rcu_read_lock(); crdp = get_call_rcu_data(); cds_wfq_enqueue(&crdp->cbs, &head->next); uatomic_inc(&crdp->qlen); wake_call_rcu_thread(crdp); + rcu_read_unlock(); } /* @@ -619,7 +644,10 @@ void call_rcu_data_free(struct call_rcu_data *crdp) wake_call_rcu_thread(default_call_rcu_data); } + call_rcu_lock(&call_rcu_mutex); cds_list_del(&crdp->list); + call_rcu_unlock(&call_rcu_mutex); + free(crdp); } @@ -629,17 +657,37 @@ void call_rcu_data_free(struct call_rcu_data *crdp) void free_all_cpu_call_rcu_data(void) { int cpu; - struct call_rcu_data *crdp; + struct call_rcu_data **crdp; + static int warned = 0; if (maxcpus <= 0) return; + + crdp = malloc(sizeof(*crdp) * maxcpus); + if (!crdp) { + if (!warned) { + fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n"); + } + warned = 1; + } + for (cpu = 0; cpu < maxcpus; cpu++) { - crdp = get_cpu_call_rcu_data(cpu); - if (crdp == NULL) + crdp[cpu] = get_cpu_call_rcu_data(cpu); + if (crdp[cpu] == NULL) continue; set_cpu_call_rcu_data(cpu, NULL); - call_rcu_data_free(crdp); } + /* + * Wait for call_rcu sites acting as RCU readers of the + * call_rcu_data to become quiescent. + */ + synchronize_rcu(); + for (cpu = 0; cpu < maxcpus; cpu++) { + if (crdp[cpu] == NULL) + continue; + call_rcu_data_free(crdp[cpu]); + } + free(crdp); } /* @@ -674,6 +722,10 @@ void call_rcu_after_fork_child(void) /* Release the mutex. */ call_rcu_unlock(&call_rcu_mutex); + /* Do nothing when call_rcu() has not been used */ + if (cds_list_empty(&call_rcu_data_list)) + return; + /* * Allocate a new default call_rcu_data structure in order * to get a working call_rcu thread to go with it. @@ -681,6 +733,12 @@ void call_rcu_after_fork_child(void) default_call_rcu_data = NULL; (void)get_default_call_rcu_data(); + /* Cleanup call_rcu_data pointers before use */ + maxcpus_reset(); + free(per_cpu_call_rcu_data); + rcu_set_pointer(&per_cpu_call_rcu_data, NULL); + thread_call_rcu_data = NULL; + /* Dispose of all of the rest of the call_rcu_data structures. */ cds_list_for_each_entry_safe(crdp, next, &call_rcu_data_list, list) { if (crdp == default_call_rcu_data) diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h index e76a844..5ea0c23 100644 --- a/urcu-call-rcu.h +++ b/urcu-call-rcu.h @@ -62,16 +62,28 @@ struct rcu_head { /* * Exported functions */ + +/* + * get_cpu_call_rcu_data should be called with RCU read-side lock held. + * Callers should be registered RCU read-side threads. + */ struct call_rcu_data *get_cpu_call_rcu_data(int cpu); pthread_t get_call_rcu_thread(struct call_rcu_data *crdp); struct call_rcu_data *create_call_rcu_data(unsigned long flags, int cpu_affinity); int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp); struct call_rcu_data *get_default_call_rcu_data(void); +/* + * get_call_rcu_data should be called from registered RCU read-side + * threads. + */ struct call_rcu_data *get_call_rcu_data(void); struct call_rcu_data *get_thread_call_rcu_data(void); void set_thread_call_rcu_data(struct call_rcu_data *crdp); int create_all_cpu_call_rcu_data(unsigned long flags); +/* + * call_rcu should be called from registered RCU read-side threads. + */ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *head)); void call_rcu_data_free(struct call_rcu_data *crdp); diff --git a/urcu-defer-impl.h b/urcu-defer-impl.h index d1ab046..34d99c9 100644 --- a/urcu-defer-impl.h +++ b/urcu-defer-impl.h @@ -61,7 +61,9 @@ * Assumes that (void *)-2L is not used often. Used to encode non-aligned * functions and non-aligned data using extra space. * We encode the (void *)-2L fct as: -2L, fct, data. - * We encode the (void *)-2L data as: -2L, fct, data. + * We encode the (void *)-2L data as either: + * fct | DQ_FCT_BIT, data (if fct is aligned), or + * -2L, fct, data (if fct is not aligned). * Here, DQ_FCT_MARK == ~DQ_FCT_BIT. Required for the test order. */ #define DQ_FCT_BIT (1 << 0) @@ -122,6 +124,7 @@ static pthread_mutex_t rcu_defer_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t defer_thread_mutex = PTHREAD_MUTEX_INITIALIZER; static int32_t defer_thread_futex; +static int32_t defer_thread_stop; /* * Written to only by each individual deferer. Read by both the deferer and @@ -148,7 +151,6 @@ static void mutex_lock_defer(pthread_mutex_t *mutex) perror("Error in pthread mutex lock"); exit(-1); } - pthread_testcancel(); poll(NULL,0,10); } #endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */ @@ -186,7 +188,13 @@ static unsigned long rcu_defer_num_callbacks(void) static void wait_defer(void) { uatomic_dec(&defer_thread_futex); - cmm_smp_mb(); /* Write futex before read queue */ + /* Write futex before read queue */ + /* Write futex before read defer_thread_stop */ + cmm_smp_mb(); + if (_CMM_LOAD_SHARED(defer_thread_stop)) { + uatomic_set(&defer_thread_futex, 0); + pthread_exit(0); + } if (rcu_defer_num_callbacks()) { cmm_smp_mb(); /* Read queue before write futex */ /* Callbacks are queued, don't wait. */ @@ -316,14 +324,27 @@ void _defer_rcu(void (*fct)(void *p), void *p) assert(head - CMM_LOAD_SHARED(defer_queue.tail) == 0); } - if (unlikely(defer_queue.last_fct_in != fct)) { + /* + * Encode: + * if the function is not changed and the data is aligned and it is + * not the marker: + * store the data + * otherwise if the function is aligned and its not the marker: + * store the function with DQ_FCT_BIT + * store the data + * otherwise: + * store the marker (DQ_FCT_MARK) + * store the function + * store the data + * + * Decode: see the comments before 'struct defer_queue' + * or the code in rcu_defer_barrier_queue(). + */ + if (unlikely(defer_queue.last_fct_in != fct + || DQ_IS_FCT_BIT(p) + || p == DQ_FCT_MARK)) { defer_queue.last_fct_in = fct; if (unlikely(DQ_IS_FCT_BIT(fct) || fct == DQ_FCT_MARK)) { - /* - * If the function to encode is not aligned or the - * marker, write DQ_FCT_MARK followed by the function - * pointer. - */ _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], DQ_FCT_MARK); _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], @@ -333,17 +354,6 @@ void _defer_rcu(void (*fct)(void *p), void *p) _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], fct); } - } else { - if (unlikely(DQ_IS_FCT_BIT(p) || p == DQ_FCT_MARK)) { - /* - * If the data to encode is not aligned or the marker, - * write DQ_FCT_MARK followed by the function pointer. - */ - _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], - DQ_FCT_MARK); - _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], - fct); - } } _CMM_STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], p); cmm_smp_wmb(); /* Publish new pointer before head */ @@ -359,7 +369,6 @@ void _defer_rcu(void (*fct)(void *p), void *p) void *thr_defer(void *args) { for (;;) { - pthread_testcancel(); /* * "Be green". Don't wake up the CPU if there is no RCU work * to perform whatsoever. Aims at saving laptop battery life by @@ -396,10 +405,17 @@ static void stop_defer_thread(void) int ret; void *tret; - pthread_cancel(tid_defer); + _CMM_STORE_SHARED(defer_thread_stop, 1); + /* Store defer_thread_stop before testing futex */ + cmm_smp_mb(); wake_up_defer(); + ret = pthread_join(tid_defer, &tret); assert(!ret); + + CMM_STORE_SHARED(defer_thread_stop, 0); + /* defer thread should always exit when futex value is 0 */ + assert(uatomic_read(&defer_thread_futex) == 0); } int rcu_defer_register_thread(void) diff --git a/urcu-qsbr.c b/urcu-qsbr.c index 6b6d3af..5530295 100644 --- a/urcu-qsbr.c +++ b/urcu-qsbr.c @@ -39,6 +39,7 @@ #include "urcu/map/urcu-qsbr.h" #define BUILD_QSBR_LIB #include "urcu/static/urcu-qsbr.h" +#include "urcu-pointer.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #undef _LGPL_SOURCE diff --git a/urcu.c b/urcu.c index 20bbf36..77f6888 100644 --- a/urcu.c +++ b/urcu.c @@ -39,6 +39,7 @@ #include "urcu/wfqueue.h" #include "urcu/map/urcu.h" #include "urcu/static/urcu.h" +#include "urcu-pointer.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #undef _LGPL_SOURCE