summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
60af049)
A concurrent get_cpu_call_rcu_data(), called by get_call_rcu_data(),
could dereference this pointer without holding any mutex. So this
situation would happen if we have a concurrent call_rcu() executing
while we do the create_all_cpu_call_rcu_data().
I think we would need to put a rcu_dereference() around
per_cpu_call_rcu_data read within get_cpu_call_rcu_data() too.
per_cpu_call_rcu_data should be done with rcu_set_pointer.
Also, a rcu read-side critical section would be required around any
usage of per_cpu_call_rcu_data, and the action of tearing down the
per-cpu data would require to wait for a quiescent state. So we would
basically require that the call_rcu users need to be registered as
RCU reader threads.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
#include "urcu/wfqueue.h"
#include "urcu/map/urcu-bp.h"
#include "urcu/static/urcu-bp.h"
#include "urcu/wfqueue.h"
#include "urcu/map/urcu-bp.h"
#include "urcu/static/urcu-bp.h"
+#include "urcu-pointer.h"
/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
#undef _LGPL_SOURCE
/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
#undef _LGPL_SOURCE
/*
* Pointer to array of pointers to per-CPU call_rcu_data structures
/*
* Pointer to array of pointers to per-CPU call_rcu_data structures
+ * and # CPUs. per_cpu_call_rcu_data is a RCU-protected pointer to an
+ * array of RCU-protected pointers to call_rcu_data. call_rcu acts as a
+ * RCU read-side and reads per_cpu_call_rcu_data and the per-cpu pointer
+ * without mutex. The call_rcu_mutex protects updates.
*/
static struct call_rcu_data **per_cpu_call_rcu_data;
*/
static struct call_rcu_data **per_cpu_call_rcu_data;
p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data));
if (p != NULL) {
memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data));
p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data));
if (p != NULL) {
memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data));
- per_cpu_call_rcu_data = p;
+ rcu_set_pointer(&per_cpu_call_rcu_data, p);
} else {
if (!warned) {
fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n");
} else {
if (!warned) {
fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n");
* CPU, returning NULL if there is none. We cannot automatically
* created it because the platform we are running on might not define
* sched_getcpu().
* CPU, returning NULL if there is none. We cannot automatically
* created it because the platform we are running on might not define
* sched_getcpu().
+ *
+ * The call to this function and use of the returned call_rcu_data
+ * should be protected by RCU read-side lock.
*/
struct call_rcu_data *get_cpu_call_rcu_data(int cpu)
{
static int warned = 0;
*/
struct call_rcu_data *get_cpu_call_rcu_data(int cpu)
{
static int warned = 0;
+ struct call_rcu_data **pcpu_crdp;
- if (per_cpu_call_rcu_data == NULL)
+ pcpu_crdp = rcu_dereference(per_cpu_call_rcu_data);
+ if (pcpu_crdp == NULL)
return NULL;
if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) {
fprintf(stderr, "[error] liburcu: get CPU # out of range\n");
return NULL;
if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) {
fprintf(stderr, "[error] liburcu: get CPU # out of range\n");
}
if (cpu < 0 || maxcpus <= cpu)
return NULL;
}
if (cpu < 0 || maxcpus <= cpu)
return NULL;
- return per_cpu_call_rcu_data[cpu];
+ return rcu_dereference(pcpu_crdp[cpu]);
- per_cpu_call_rcu_data[cpu] = crdp;
+ rcu_set_pointer(&per_cpu_call_rcu_data[cpu], crdp);
call_rcu_unlock(&call_rcu_mutex);
return 0;
}
call_rcu_unlock(&call_rcu_mutex);
return 0;
}
* structure assigned to the CPU on which the thread is running,
* followed by the default call_rcu_data structure. If there is not
* yet a default call_rcu_data structure, one will be created.
* structure assigned to the CPU on which the thread is running,
* followed by the default call_rcu_data structure. If there is not
* yet a default call_rcu_data structure, one will be created.
+ *
+ * Calls to this function and use of the returned call_rcu_data should
+ * be protected by RCU read-side lock.
*/
struct call_rcu_data *get_call_rcu_data(void)
{
*/
struct call_rcu_data *get_call_rcu_data(void)
{
* need the first invocation of call_rcu() to be fast, make sure
* to create a call_rcu thread first. One way to accomplish this is
* "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data().
* need the first invocation of call_rcu() to be fast, make sure
* to create a call_rcu thread first. One way to accomplish this is
* "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data().
+ *
+ * call_rcu must be called by registered RCU read-side threads.
*/
void call_rcu(struct rcu_head *head,
*/
void call_rcu(struct rcu_head *head,
cds_wfq_node_init(&head->next);
head->func = func;
cds_wfq_node_init(&head->next);
head->func = func;
+ /* Holding rcu read-side lock across use of per-cpu crdp */
+ rcu_read_lock();
crdp = get_call_rcu_data();
cds_wfq_enqueue(&crdp->cbs, &head->next);
uatomic_inc(&crdp->qlen);
wake_call_rcu_thread(crdp);
crdp = get_call_rcu_data();
cds_wfq_enqueue(&crdp->cbs, &head->next);
uatomic_inc(&crdp->qlen);
wake_call_rcu_thread(crdp);
void free_all_cpu_call_rcu_data(void)
{
int cpu;
void free_all_cpu_call_rcu_data(void)
{
int cpu;
- struct call_rcu_data *crdp;
+ struct call_rcu_data **crdp;
+ static int warned = 0;
if (maxcpus <= 0)
return;
if (maxcpus <= 0)
return;
+
+ crdp = malloc(sizeof(*crdp) * maxcpus);
+ if (!crdp) {
+ if (!warned) {
+ fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n");
+ }
+ warned = 1;
+ }
+
for (cpu = 0; cpu < maxcpus; cpu++) {
for (cpu = 0; cpu < maxcpus; cpu++) {
- crdp = get_cpu_call_rcu_data(cpu);
- if (crdp == NULL)
+ crdp[cpu] = get_cpu_call_rcu_data(cpu);
+ if (crdp[cpu] == NULL)
continue;
set_cpu_call_rcu_data(cpu, NULL);
continue;
set_cpu_call_rcu_data(cpu, NULL);
- call_rcu_data_free(crdp);
+ /*
+ * Wait for call_rcu sites acting as RCU readers of the
+ * call_rcu_data to become quiescent.
+ */
+ synchronize_rcu();
+ for (cpu = 0; cpu < maxcpus; cpu++) {
+ if (crdp[cpu] == NULL)
+ continue;
+ call_rcu_data_free(crdp[cpu]);
+ }
+ free(crdp);
/* Cleanup call_rcu_data pointers before use */
maxcpus_reset();
free(per_cpu_call_rcu_data);
/* Cleanup call_rcu_data pointers before use */
maxcpus_reset();
free(per_cpu_call_rcu_data);
- per_cpu_call_rcu_data = NULL;
+ rcu_set_pointer(&per_cpu_call_rcu_data, NULL);
thread_call_rcu_data = NULL;
/* Dispose of all of the rest of the call_rcu_data structures. */
thread_call_rcu_data = NULL;
/* Dispose of all of the rest of the call_rcu_data structures. */
/*
* Exported functions
*/
/*
* Exported functions
*/
+
+/*
+ * get_cpu_call_rcu_data should be called with RCU read-side lock held.
+ * Callers should be registered RCU read-side threads.
+ */
struct call_rcu_data *get_cpu_call_rcu_data(int cpu);
pthread_t get_call_rcu_thread(struct call_rcu_data *crdp);
struct call_rcu_data *create_call_rcu_data(unsigned long flags,
int cpu_affinity);
int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp);
struct call_rcu_data *get_default_call_rcu_data(void);
struct call_rcu_data *get_cpu_call_rcu_data(int cpu);
pthread_t get_call_rcu_thread(struct call_rcu_data *crdp);
struct call_rcu_data *create_call_rcu_data(unsigned long flags,
int cpu_affinity);
int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp);
struct call_rcu_data *get_default_call_rcu_data(void);
+/*
+ * get_call_rcu_data should be called from registered RCU read-side
+ * threads.
+ */
struct call_rcu_data *get_call_rcu_data(void);
struct call_rcu_data *get_thread_call_rcu_data(void);
void set_thread_call_rcu_data(struct call_rcu_data *crdp);
int create_all_cpu_call_rcu_data(unsigned long flags);
struct call_rcu_data *get_call_rcu_data(void);
struct call_rcu_data *get_thread_call_rcu_data(void);
void set_thread_call_rcu_data(struct call_rcu_data *crdp);
int create_all_cpu_call_rcu_data(unsigned long flags);
+/*
+ * call_rcu should be called from registered RCU read-side threads.
+ */
void call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *head));
void call_rcu_data_free(struct call_rcu_data *crdp);
void call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *head));
void call_rcu_data_free(struct call_rcu_data *crdp);
#include "urcu/map/urcu-qsbr.h"
#define BUILD_QSBR_LIB
#include "urcu/static/urcu-qsbr.h"
#include "urcu/map/urcu-qsbr.h"
#define BUILD_QSBR_LIB
#include "urcu/static/urcu-qsbr.h"
+#include "urcu-pointer.h"
/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
#undef _LGPL_SOURCE
/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
#undef _LGPL_SOURCE
#include "urcu/wfqueue.h"
#include "urcu/map/urcu.h"
#include "urcu/static/urcu.h"
#include "urcu/wfqueue.h"
#include "urcu/map/urcu.h"
#include "urcu/static/urcu.h"
+#include "urcu-pointer.h"
/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
#undef _LGPL_SOURCE
/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
#undef _LGPL_SOURCE