/*
* Pointer to array of pointers to per-CPU call_rcu_data structures
- * and # CPUs.
+ * and # CPUs. per_cpu_call_rcu_data is a RCU-protected pointer to an
+ * array of RCU-protected pointers to call_rcu_data. call_rcu acts as a
+ * RCU read-side and reads per_cpu_call_rcu_data and the per-cpu pointer
+ * without mutex. The call_rcu_mutex protects updates.
*/
static struct call_rcu_data **per_cpu_call_rcu_data;
static long maxcpus;
+static void maxcpus_reset(void)
+{
+ maxcpus = 0;
+}
+
/* Allocate the array if it has not already been allocated. */
static void alloc_cpu_call_rcu_data(void)
p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data));
if (p != NULL) {
memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data));
- per_cpu_call_rcu_data = p;
+ rcu_set_pointer(&per_cpu_call_rcu_data, p);
} else {
if (!warned) {
fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n");
static struct call_rcu_data **per_cpu_call_rcu_data = NULL;
static const long maxcpus = -1;
+static void maxcpus_reset(void)
+{
+}
+
static void alloc_cpu_call_rcu_data(void)
{
}
* CPU, returning NULL if there is none. We cannot automatically
* created it because the platform we are running on might not define
* sched_getcpu().
+ *
+ * The call to this function and use of the returned call_rcu_data
+ * should be protected by RCU read-side lock.
*/
struct call_rcu_data *get_cpu_call_rcu_data(int cpu)
{
static int warned = 0;
+ struct call_rcu_data **pcpu_crdp;
- if (per_cpu_call_rcu_data == NULL)
+ pcpu_crdp = rcu_dereference(per_cpu_call_rcu_data);
+ if (pcpu_crdp == NULL)
return NULL;
if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) {
fprintf(stderr, "[error] liburcu: get CPU # out of range\n");
}
if (cpu < 0 || maxcpus <= cpu)
return NULL;
- return per_cpu_call_rcu_data[cpu];
+ return rcu_dereference(pcpu_crdp[cpu]);
}
/*
* the caller's responsibility to dispose of the removed structure.
* Use get_cpu_call_rcu_data() to obtain a pointer to the old structure
* (prior to NULLing it out, of course).
+ *
+ * The caller must wait for a grace-period to pass between return from
+ * set_cpu_call_rcu_data() and call to call_rcu_data_free() passing the
+ * previous call rcu data as argument.
*/
int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp)
errno = EINVAL;
return -EINVAL;
}
- call_rcu_unlock(&call_rcu_mutex);
+
if (per_cpu_call_rcu_data == NULL) {
+ call_rcu_unlock(&call_rcu_mutex);
errno = ENOMEM;
return -ENOMEM;
}
- per_cpu_call_rcu_data[cpu] = crdp;
+
+ if (per_cpu_call_rcu_data[cpu] != NULL && crdp != NULL) {
+ call_rcu_unlock(&call_rcu_mutex);
+ errno = EEXIST;
+ return -EEXIST;
+ }
+
+ rcu_set_pointer(&per_cpu_call_rcu_data[cpu], crdp);
+ call_rcu_unlock(&call_rcu_mutex);
return 0;
}
* structure assigned to the CPU on which the thread is running,
* followed by the default call_rcu_data structure. If there is not
* yet a default call_rcu_data structure, one will be created.
+ *
+ * Calls to this function and use of the returned call_rcu_data should
+ * be protected by RCU read-side lock.
*/
struct call_rcu_data *get_call_rcu_data(void)
{
- int curcpu;
- static int warned = 0;
+ struct call_rcu_data *crd;
if (thread_call_rcu_data != NULL)
return thread_call_rcu_data;
- if (maxcpus <= 0)
- return get_default_call_rcu_data();
- curcpu = sched_getcpu();
- if (!warned && (curcpu < 0 || maxcpus <= curcpu)) {
- fprintf(stderr, "[error] liburcu: gcrd CPU # out of range\n");
- warned = 1;
+
+ if (maxcpus > 0) {
+ crd = get_cpu_call_rcu_data(sched_getcpu());
+ if (crd)
+ return crd;
}
- if (curcpu >= 0 && maxcpus > curcpu &&
- per_cpu_call_rcu_data != NULL &&
- per_cpu_call_rcu_data[curcpu] != NULL)
- return per_cpu_call_rcu_data[curcpu];
+
return get_default_call_rcu_data();
}
}
call_rcu_unlock(&call_rcu_mutex);
if ((ret = set_cpu_call_rcu_data(i, crdp)) != 0) {
- /* FIXME: Leaks crdp for now. */
- return ret; /* Can happen on race. */
+ call_rcu_data_free(crdp);
+
+ /* it has been created by other thread */
+ if (ret == -EEXIST)
+ continue;
+
+ return ret;
}
}
return 0;
* need the first invocation of call_rcu() to be fast, make sure
* to create a call_rcu thread first. One way to accomplish this is
* "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data().
+ *
+ * call_rcu must be called by registered RCU read-side threads.
*/
void call_rcu(struct rcu_head *head,
cds_wfq_node_init(&head->next);
head->func = func;
+ /* Holding rcu read-side lock across use of per-cpu crdp */
+ rcu_read_lock();
crdp = get_call_rcu_data();
cds_wfq_enqueue(&crdp->cbs, &head->next);
uatomic_inc(&crdp->qlen);
wake_call_rcu_thread(crdp);
+ rcu_read_unlock();
}
/*
*
* We also silently refuse to free NULL pointers. This simplifies
* the calling code.
+ *
+ * The caller must wait for a grace-period to pass between return from
+ * set_cpu_call_rcu_data() and call to call_rcu_data_free() passing the
+ * previous call rcu data as argument.
*/
void call_rcu_data_free(struct call_rcu_data *crdp)
{
_CMM_STORE_SHARED(crdp->cbs.head, NULL);
cbs_tail = (struct cds_wfq_node **)
uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
+ /* Create default call rcu data if need be */
+ (void) get_default_call_rcu_data();
cbs_endprev = (struct cds_wfq_node **)
uatomic_xchg(&default_call_rcu_data, cbs_tail);
*cbs_endprev = cbs;
uatomic_add(&default_call_rcu_data->qlen,
uatomic_read(&crdp->qlen));
- cds_list_del(&crdp->list);
- free(crdp);
+ wake_call_rcu_thread(default_call_rcu_data);
}
+
+ call_rcu_lock(&call_rcu_mutex);
+ cds_list_del(&crdp->list);
+ call_rcu_unlock(&call_rcu_mutex);
+
+ free(crdp);
}
/*
void free_all_cpu_call_rcu_data(void)
{
int cpu;
- struct call_rcu_data *crdp;
+ struct call_rcu_data **crdp;
+ static int warned = 0;
if (maxcpus <= 0)
return;
+
+ crdp = malloc(sizeof(*crdp) * maxcpus);
+ if (!crdp) {
+ if (!warned) {
+ fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n");
+ }
+ warned = 1;
+ return;
+ }
+
for (cpu = 0; cpu < maxcpus; cpu++) {
- crdp = get_cpu_call_rcu_data(cpu);
- if (crdp == NULL)
+ crdp[cpu] = get_cpu_call_rcu_data(cpu);
+ if (crdp[cpu] == NULL)
continue;
set_cpu_call_rcu_data(cpu, NULL);
- call_rcu_data_free(crdp);
}
+ /*
+ * Wait for call_rcu sites acting as RCU readers of the
+ * call_rcu_data to become quiescent.
+ */
+ synchronize_rcu();
+ for (cpu = 0; cpu < maxcpus; cpu++) {
+ if (crdp[cpu] == NULL)
+ continue;
+ call_rcu_data_free(crdp[cpu]);
+ }
+ free(crdp);
}
/*
*/
void call_rcu_after_fork_child(void)
{
- struct call_rcu_data *crdp;
+ struct call_rcu_data *crdp, *next;
/* Release the mutex. */
call_rcu_unlock(&call_rcu_mutex);
+ /* Do nothing when call_rcu() has not been used */
+ if (cds_list_empty(&call_rcu_data_list))
+ return;
+
/*
* Allocate a new default call_rcu_data structure in order
* to get a working call_rcu thread to go with it.
default_call_rcu_data = NULL;
(void)get_default_call_rcu_data();
+ /* Cleanup call_rcu_data pointers before use */
+ maxcpus_reset();
+ free(per_cpu_call_rcu_data);
+ rcu_set_pointer(&per_cpu_call_rcu_data, NULL);
+ thread_call_rcu_data = NULL;
+
/* Dispose of all of the rest of the call_rcu_data structures. */
- while (call_rcu_data_list.next != call_rcu_data_list.prev) {
- crdp = cds_list_entry(call_rcu_data_list.prev,
- struct call_rcu_data, list);
+ cds_list_for_each_entry_safe(crdp, next, &call_rcu_data_list, list) {
if (crdp == default_call_rcu_data)
- crdp = cds_list_entry(crdp->list.prev,
- struct call_rcu_data, list);
+ continue;
uatomic_set(&crdp->flags, URCU_CALL_RCU_STOPPED);
call_rcu_data_free(crdp);
}