X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=urcu-call-rcu.c;h=bb56dbb891fa0a270e7058bd18a47bfe4d554cbf;hb=7106ddf86905188b9e92ee7000165c1af22f793e;hp=5c003aa944e9606ca8e2d27d4572bcefd186f0a8;hpb=b57aee663af988b7f686c076ce6aef2a0d2487c8;p=urcu.git diff --git a/urcu-call-rcu.c b/urcu-call-rcu.c index 5c003aa..bb56dbb 100644 --- a/urcu-call-rcu.c +++ b/urcu-call-rcu.c @@ -36,6 +36,7 @@ #include "urcu/wfqueue.h" #include "urcu-call-rcu.h" #include "urcu-pointer.h" +#include "urcu/list.h" /* Data structure that identifies a call_rcu thread. */ @@ -46,8 +47,16 @@ struct call_rcu_data { pthread_cond_t cond; unsigned long qlen; pthread_t tid; + struct cds_list_head list; } __attribute__((aligned(CAA_CACHE_LINE_SIZE))); +/* + * List of all call_rcu_data structures to keep valgrind happy. + * Protected by call_rcu_mutex. + */ + +CDS_LIST_HEAD(call_rcu_data_list); + /* Link a thread using call_rcu() to its call_rcu thread. */ static __thread struct call_rcu_data *thread_call_rcu_data; @@ -174,6 +183,8 @@ static void *call_rcu_thread(void *arg) } while (cbs != NULL); uatomic_sub(&crdp->qlen, cbcount); } + if (crdp->flags & URCU_CALL_RCU_STOP) + break; if (crdp->flags & URCU_CALL_RCU_RT) poll(NULL, 0, 10); else { @@ -192,15 +203,20 @@ static void *call_rcu_thread(void *arg) call_rcu_unlock(&crdp->mtx); } } - return NULL; /* NOTREACHED */ + call_rcu_lock(&crdp->mtx); + crdp->flags |= URCU_CALL_RCU_STOPPED; + call_rcu_unlock(&crdp->mtx); + return NULL; } /* * Create both a call_rcu thread and the corresponding call_rcu_data - * structure, linking the structure in as specified. + * structure, linking the structure in as specified. Caller must hold + * call_rcu_mutex. */ -void call_rcu_data_init(struct call_rcu_data **crdpp, unsigned long flags) +static void call_rcu_data_init(struct call_rcu_data **crdpp, + unsigned long flags) { struct call_rcu_data *crdp; @@ -221,6 +237,7 @@ void call_rcu_data_init(struct call_rcu_data **crdpp, unsigned long flags) exit(-1); } crdp->flags = flags | URCU_CALL_RCU_RUNNING; + cds_list_add(&crdp->list, &call_rcu_data_list); cmm_smp_mb(); /* Structure initialized before pointer is planted. */ *crdpp = crdp; if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) { @@ -265,7 +282,7 @@ pthread_t get_call_rcu_thread(struct call_rcu_data *crdp) * Create a call_rcu_data structure (with thread) and return a pointer. */ -struct call_rcu_data *create_call_rcu_data(unsigned long flags) +static struct call_rcu_data *__create_call_rcu_data(unsigned long flags) { struct call_rcu_data *crdp; @@ -273,8 +290,23 @@ struct call_rcu_data *create_call_rcu_data(unsigned long flags) return crdp; } +struct call_rcu_data *create_call_rcu_data(unsigned long flags) +{ + struct call_rcu_data *crdp; + + call_rcu_lock(&call_rcu_mutex); + crdp = __create_call_rcu_data(flags); + call_rcu_unlock(&call_rcu_mutex); + return crdp; +} + /* * Set the specified CPU to use the specified call_rcu_data structure. + * + * Use NULL to remove a CPU's call_rcu_data structure, but it is + * the caller's responsibility to dispose of the removed structure. + * Use get_cpu_call_rcu_data() to obtain a pointer to the old structure + * (prior to NULLing it out, of course). */ int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp) @@ -363,6 +395,11 @@ struct call_rcu_data *get_thread_call_rcu_data(void) * Set this task's call_rcu_data structure as specified, regardless * of whether or not this task already had one. (This allows switching * to and from real-time call_rcu threads, for example.) + * + * Use NULL to remove a thread's call_rcu_data structure, but it is + * the caller's responsibility to dispose of the removed structure. + * Use get_thread_call_rcu_data() to obtain a pointer to the old structure + * (prior to NULLing it out, of course). */ void set_thread_call_rcu_data(struct call_rcu_data *crdp) @@ -399,7 +436,7 @@ int create_all_cpu_call_rcu_data(unsigned long flags) call_rcu_unlock(&call_rcu_mutex); continue; } - crdp = create_call_rcu_data(flags); + crdp = __create_call_rcu_data(flags); if (crdp == NULL) { call_rcu_unlock(&call_rcu_mutex); errno = ENOMEM; @@ -414,6 +451,24 @@ int create_all_cpu_call_rcu_data(unsigned long flags) return 0; } +/* + * Wake up the call_rcu thread corresponding to the specified + * call_rcu_data structure. + */ +static void wake_call_rcu_thread(struct call_rcu_data *crdp) +{ + if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) { + call_rcu_lock(&crdp->mtx); + if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) { + if (pthread_cond_signal(&crdp->cond) != 0) { + perror("pthread_cond_signal"); + exit(-1); + } + } + call_rcu_unlock(&crdp->mtx); + } +} + /* * Schedule a function to be invoked after a following grace period. * This is the only function that must be called -- the others are @@ -437,14 +492,102 @@ void call_rcu(struct rcu_head *head, crdp = get_call_rcu_data(); cds_wfq_enqueue(&crdp->cbs, &head->next); uatomic_inc(&crdp->qlen); - if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) { + wake_call_rcu_thread(crdp); +} + +/* + * Free up the specified call_rcu_data structure, terminating the + * associated call_rcu thread. The caller must have previously + * removed the call_rcu_data structure from per-thread or per-CPU + * usage. For example, set_cpu_call_rcu_data(cpu, NULL) for per-CPU + * call_rcu_data structures or set_thread_call_rcu_data(NULL) for + * per-thread call_rcu_data structures. + * + * We silently refuse to free up the default call_rcu_data structure + * because that is where we put any leftover callbacks. Note that + * the possibility of self-spawning callbacks makes it impossible + * to execute all the callbacks in finite time without putting any + * newly spawned callbacks somewhere else. The "somewhere else" of + * last resort is the default call_rcu_data structure. + * + * We also silently refuse to free NULL pointers. This simplifies + * the calling code. + */ +void call_rcu_data_free(struct call_rcu_data *crdp) +{ + struct cds_wfq_node *cbs; + struct cds_wfq_node **cbs_tail; + struct cds_wfq_node **cbs_endprev; + + if (crdp == NULL || crdp == default_call_rcu_data) { + return; + } + if ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) { call_rcu_lock(&crdp->mtx); - if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) { - if (pthread_cond_signal(&crdp->cond) != 0) { - perror("pthread_cond_signal"); - exit(-1); - } - } + crdp->flags |= URCU_CALL_RCU_STOP; call_rcu_unlock(&crdp->mtx); + wake_call_rcu_thread(crdp); + while ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) + poll(NULL, 0, 1); + } + if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { + while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) + poll(NULL, 0, 1); + _CMM_STORE_SHARED(crdp->cbs.head, NULL); + cbs_tail = (struct cds_wfq_node **) + uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head); + cbs_endprev = (struct cds_wfq_node **) + uatomic_xchg(&default_call_rcu_data, cbs_tail); + *cbs_endprev = cbs; + uatomic_add(&default_call_rcu_data->qlen, + uatomic_read(&crdp->qlen)); + cds_list_del(&crdp->list); + free(crdp); + } +} + +/* + * Clean up all the per-CPU call_rcu threads. + */ +void free_all_cpu_call_rcu_data(void) +{ + int cpu; + struct call_rcu_data *crdp; + + if (maxcpus <= 0) + return; + for (cpu = 0; cpu < maxcpus; cpu++) { + crdp = get_cpu_call_rcu_data(cpu); + if (crdp == NULL) + continue; + set_cpu_call_rcu_data(cpu, NULL); + call_rcu_data_free(crdp); + } +} + +/* + * Clean up call_rcu data structures in the child of a successful fork() + * that is not followed by exec(). + */ +void call_rcu_after_fork_child(void) +{ + struct call_rcu_data *crdp; + + /* + * Allocate a new default call_rcu_data structure in order + * to get a working call_rcu thread to go with it. + */ + default_call_rcu_data = NULL; + (void)get_default_call_rcu_data(); + + /* Dispose of all of the rest of the call_rcu_data structures. */ + while (call_rcu_data_list.next != call_rcu_data_list.prev) { + crdp = cds_list_entry(call_rcu_data_list.prev, + struct call_rcu_data, list); + if (crdp == default_call_rcu_data) + crdp = cds_list_entry(crdp->list.prev, + struct call_rcu_data, list); + crdp->flags = URCU_CALL_RCU_STOPPED; + call_rcu_data_free(crdp); } }