* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
+#define _GNU_SOURCE
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <assert.h>
#include <stdlib.h>
+#include <stdint.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
#include <sys/time.h>
-#include <syscall.h>
#include <unistd.h>
+#include <sched.h>
#include "config.h"
#include "urcu/wfqueue.h"
#include "urcu-call-rcu.h"
#include "urcu-pointer.h"
#include "urcu/list.h"
+#include "urcu/futex.h"
/* Data structure that identifies a call_rcu thread. */
struct call_rcu_data {
struct cds_wfq_queue cbs;
unsigned long flags;
- pthread_mutex_t mtx;
- pthread_cond_t cond;
- unsigned long qlen;
+ int32_t futex;
+ unsigned long qlen; /* maintained for debugging. */
pthread_t tid;
+ int cpu_affinity;
struct cds_list_head list;
} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
}
}
+#if HAVE_SCHED_SETAFFINITY
+static
+int set_thread_cpu_affinity(struct call_rcu_data *crdp)
+{
+ cpu_set_t mask;
+
+ if (crdp->cpu_affinity < 0)
+ return 0;
+
+ CPU_ZERO(&mask);
+ CPU_SET(crdp->cpu_affinity, &mask);
+#if SCHED_SETAFFINITY_ARGS == 2
+ return sched_setaffinity(0, &mask);
+#else
+ return sched_setaffinity(0, sizeof(mask), &mask);
+#endif
+}
+#else
+static
+int set_thread_cpu_affinity(struct call_rcu_data *crdp)
+{
+ return 0;
+}
+#endif
+
+static void call_rcu_wait(struct call_rcu_data *crdp)
+{
+ /* Read call_rcu list before read futex */
+ cmm_smp_mb();
+ if (uatomic_read(&crdp->futex) == -1)
+ futex_async(&crdp->futex, FUTEX_WAIT, -1,
+ NULL, NULL, 0);
+}
+
+static void call_rcu_wake_up(struct call_rcu_data *crdp)
+{
+ /* Write to call_rcu list before reading/writing futex */
+ cmm_smp_mb();
+ if (unlikely(uatomic_read(&crdp->futex) == -1)) {
+ uatomic_set(&crdp->futex, 0);
+ futex_async(&crdp->futex, FUTEX_WAKE, 1,
+ NULL, NULL, 0);
+ }
+}
+
/* This is the code run by each call_rcu thread. */
static void *call_rcu_thread(void *arg)
struct cds_wfq_node **cbs_tail;
struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
struct rcu_head *rhp;
+ int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT);
+
+ if (set_thread_cpu_affinity(crdp) != 0) {
+ perror("pthread_setaffinity_np");
+ exit(-1);
+ }
thread_call_rcu_data = crdp;
+ if (!rt) {
+ uatomic_dec(&crdp->futex);
+ /* Decrement futex before reading call_rcu list */
+ cmm_smp_mb();
+ }
for (;;) {
if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
} while (cbs != NULL);
uatomic_sub(&crdp->qlen, cbcount);
}
- if (crdp->flags & URCU_CALL_RCU_STOP)
+ if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP)
break;
- if (crdp->flags & URCU_CALL_RCU_RT)
- poll(NULL, 0, 10);
- else {
- call_rcu_lock(&crdp->mtx);
- _CMM_STORE_SHARED(crdp->flags,
- crdp->flags & ~URCU_CALL_RCU_RUNNING);
- if (&crdp->cbs.head ==
- _CMM_LOAD_SHARED(crdp->cbs.tail) &&
- pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) {
- perror("pthread_cond_wait");
- exit(-1);
+ if (!rt) {
+ if (&crdp->cbs.head
+ == _CMM_LOAD_SHARED(crdp->cbs.tail)) {
+ call_rcu_wait(crdp);
+ poll(NULL, 0, 10);
+ uatomic_dec(&crdp->futex);
+ /*
+ * Decrement futex before reading
+ * call_rcu list.
+ */
+ cmm_smp_mb();
+ } else {
+ poll(NULL, 0, 10);
}
- _CMM_STORE_SHARED(crdp->flags,
- crdp->flags | URCU_CALL_RCU_RUNNING);
+ } else {
poll(NULL, 0, 10);
- call_rcu_unlock(&crdp->mtx);
}
}
- call_rcu_lock(&crdp->mtx);
- crdp->flags |= URCU_CALL_RCU_STOPPED;
- call_rcu_unlock(&crdp->mtx);
+ if (!rt) {
+ /*
+ * Read call_rcu list before write futex.
+ */
+ cmm_smp_mb();
+ uatomic_set(&crdp->futex, 0);
+ }
+ uatomic_or(&crdp->flags, URCU_CALL_RCU_STOPPED);
return NULL;
}
*/
static void call_rcu_data_init(struct call_rcu_data **crdpp,
- unsigned long flags)
+ unsigned long flags,
+ int cpu_affinity)
{
struct call_rcu_data *crdp;
memset(crdp, '\0', sizeof(*crdp));
cds_wfq_init(&crdp->cbs);
crdp->qlen = 0;
- if (pthread_mutex_init(&crdp->mtx, NULL) != 0) {
- perror("pthread_mutex_init");
- exit(-1);
- }
- if (pthread_cond_init(&crdp->cond, NULL) != 0) {
- perror("pthread_cond_init");
- exit(-1);
- }
- crdp->flags = flags | URCU_CALL_RCU_RUNNING;
+ crdp->futex = 0;
+ crdp->flags = flags;
cds_list_add(&crdp->list, &call_rcu_data_list);
+ crdp->cpu_affinity = cpu_affinity;
cmm_smp_mb(); /* Structure initialized before pointer is planted. */
*crdpp = crdp;
if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) {
* Create a call_rcu_data structure (with thread) and return a pointer.
*/
-static struct call_rcu_data *__create_call_rcu_data(unsigned long flags)
+static struct call_rcu_data *__create_call_rcu_data(unsigned long flags,
+ int cpu_affinity)
{
struct call_rcu_data *crdp;
- call_rcu_data_init(&crdp, flags);
+ call_rcu_data_init(&crdp, flags, cpu_affinity);
return crdp;
}
-struct call_rcu_data *create_call_rcu_data(unsigned long flags)
+struct call_rcu_data *create_call_rcu_data(unsigned long flags,
+ int cpu_affinity)
{
struct call_rcu_data *crdp;
call_rcu_lock(&call_rcu_mutex);
- crdp = __create_call_rcu_data(flags);
+ crdp = __create_call_rcu_data(flags, cpu_affinity);
call_rcu_unlock(&call_rcu_mutex);
return crdp;
}
call_rcu_unlock(&call_rcu_mutex);
return default_call_rcu_data;
}
- call_rcu_data_init(&default_call_rcu_data, 0);
+ call_rcu_data_init(&default_call_rcu_data, 0, -1);
call_rcu_unlock(&call_rcu_mutex);
return default_call_rcu_data;
}
call_rcu_unlock(&call_rcu_mutex);
continue;
}
- crdp = __create_call_rcu_data(flags);
+ crdp = __create_call_rcu_data(flags, i);
if (crdp == NULL) {
call_rcu_unlock(&call_rcu_mutex);
errno = ENOMEM;
*/
static void wake_call_rcu_thread(struct call_rcu_data *crdp)
{
- if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) {
- call_rcu_lock(&crdp->mtx);
- if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) {
- if (pthread_cond_signal(&crdp->cond) != 0) {
- perror("pthread_cond_signal");
- exit(-1);
- }
- }
- call_rcu_unlock(&crdp->mtx);
- }
+ if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT))
+ call_rcu_wake_up(crdp);
}
/*
if (crdp == NULL || crdp == default_call_rcu_data) {
return;
}
- if ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) {
- call_rcu_lock(&crdp->mtx);
- crdp->flags |= URCU_CALL_RCU_STOP;
- call_rcu_unlock(&crdp->mtx);
+ if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) {
+ uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP);
wake_call_rcu_thread(crdp);
- while ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0)
+ while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0)
poll(NULL, 0, 1);
}
if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
if (crdp == default_call_rcu_data)
crdp = cds_list_entry(crdp->list.prev,
struct call_rcu_data, list);
- crdp->flags = URCU_CALL_RCU_STOPPED;
+ uatomic_set(&crdp->flags, URCU_CALL_RCU_STOPPED);
call_rcu_data_free(crdp);
}
}