#include <string.h>
#include <errno.h>
#include <poll.h>
-#include <linux/futex.h>
#include <sys/time.h>
#include <syscall.h>
#include <unistd.h>
#include "urcu-defer.h"
#define futex(...) syscall(__NR_futex, __VA_ARGS__)
+#define FUTEX_WAIT 0
+#define FUTEX_WAKE 1
void __attribute__((destructor)) urcu_defer_exit(void);
static pthread_mutex_t urcu_defer_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t defer_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
-int defer_thread_futex;
+static int defer_thread_futex;
/*
* Written to only by each individual deferer. Read by both the deferer and
* the reclamation tread.
*/
-struct defer_queue __thread defer_queue;
-
-/* Thread IDs of registered deferers */
-#define INIT_NUM_THREADS 4
-
-struct deferer_registry {
- pthread_t tid;
- struct defer_queue *defer_queue;
- unsigned long last_head;
-};
-
-static struct deferer_registry *registry;
-static int num_deferers, alloc_deferers;
-
+static struct defer_queue __thread defer_queue;
+static LIST_HEAD(registry);
static pthread_t tid_defer;
-/*
- * Wake-up any waiting defer thread. Called from many concurrent threads.
- */
-static void wake_up_defer(void)
-{
- if (unlikely(atomic_read(&defer_thread_futex) == -1))
- atomic_set(&defer_thread_futex, 0);
- futex(&defer_thread_futex, FUTEX_WAKE,
- 0, NULL, NULL, 0);
-}
-
-/*
- * Defer thread waiting. Single thread.
- */
-static void wait_defer(void)
-{
- atomic_dec(&defer_thread_futex);
- if (atomic_read(&defer_thread_futex) == -1)
- futex(&defer_thread_futex, FUTEX_WAIT, -1,
- NULL, NULL, 0);
-}
-
static void internal_urcu_lock(pthread_mutex_t *mutex)
{
int ret;
}
}
+/*
+ * Wake-up any waiting defer thread. Called from many concurrent threads.
+ */
+static void wake_up_defer(void)
+{
+ if (unlikely(uatomic_read(&defer_thread_futex) == -1)) {
+ uatomic_set(&defer_thread_futex, 0);
+ futex(&defer_thread_futex, FUTEX_WAKE, 1,
+ NULL, NULL, 0);
+ }
+}
+
+static unsigned long rcu_defer_num_callbacks(void)
+{
+ unsigned long num_items = 0, head;
+ struct defer_queue *index;
+
+ internal_urcu_lock(&urcu_defer_mutex);
+ list_for_each_entry(index, ®istry, list) {
+ head = LOAD_SHARED(index->head);
+ num_items += head - index->tail;
+ }
+ internal_urcu_unlock(&urcu_defer_mutex);
+ return num_items;
+}
+
+/*
+ * Defer thread waiting. Single thread.
+ */
+static void wait_defer(void)
+{
+ uatomic_dec(&defer_thread_futex);
+ smp_mb(); /* Write futex before read queue */
+ if (rcu_defer_num_callbacks()) {
+ smp_mb(); /* Read queue before write futex */
+ /* Callbacks are queued, don't wait. */
+ uatomic_set(&defer_thread_futex, 0);
+ } else {
+ smp_rmb(); /* Read queue before read futex */
+ if (uatomic_read(&defer_thread_futex) == -1)
+ futex(&defer_thread_futex, FUTEX_WAIT, -1,
+ NULL, NULL, 0);
+ }
+}
+
/*
* Must be called after Q.S. is reached.
*/
rcu_defer_barrier_queue(&defer_queue, head);
}
-
void rcu_defer_barrier_thread(void)
{
internal_urcu_lock(&urcu_defer_mutex);
void rcu_defer_barrier(void)
{
- struct deferer_registry *index;
+ struct defer_queue *index;
unsigned long num_items = 0;
- if (!registry)
+ if (list_empty(®istry))
return;
internal_urcu_lock(&urcu_defer_mutex);
- for (index = registry; index < registry + num_deferers; index++) {
- index->last_head = LOAD_SHARED(index->defer_queue->head);
- num_items += index->last_head - index->defer_queue->tail;
+ list_for_each_entry(index, ®istry, list) {
+ index->last_head = LOAD_SHARED(index->head);
+ num_items += index->last_head - index->tail;
}
if (likely(!num_items)) {
/*
goto end;
}
synchronize_rcu();
- for (index = registry; index < registry + num_deferers; index++)
- rcu_defer_barrier_queue(index->defer_queue,
- index->last_head);
+ list_for_each_entry(index, ®istry, list)
+ rcu_defer_barrier_queue(index, index->last_head);
end:
internal_urcu_unlock(&urcu_defer_mutex);
}
smp_wmb(); /* Publish new pointer before head */
/* Write q[] before head. */
STORE_SHARED(defer_queue.head, head);
+ smp_mb(); /* Write queue head before read futex */
/*
* Wake-up any waiting defer thread.
*/
_rcu_defer_queue(fct, p);
}
-static void rcu_add_deferer(pthread_t id)
-{
- struct deferer_registry *oldarray;
-
- if (!registry) {
- alloc_deferers = INIT_NUM_THREADS;
- num_deferers = 0;
- registry =
- malloc(sizeof(struct deferer_registry) * alloc_deferers);
- }
- if (alloc_deferers < num_deferers + 1) {
- oldarray = registry;
- registry = malloc(sizeof(struct deferer_registry)
- * (alloc_deferers << 1));
- memcpy(registry, oldarray,
- sizeof(struct deferer_registry) * alloc_deferers);
- alloc_deferers <<= 1;
- free(oldarray);
- }
- registry[num_deferers].tid = id;
- /* reference to the TLS of _this_ deferer thread. */
- registry[num_deferers].defer_queue = &defer_queue;
- registry[num_deferers].last_head = 0;
- num_deferers++;
-}
-
-/*
- * Never shrink (implementation limitation).
- * This is O(nb threads). Eventually use a hash table.
- */
-static void rcu_remove_deferer(pthread_t id)
-{
- struct deferer_registry *index;
-
- assert(registry != NULL);
- for (index = registry; index < registry + num_deferers; index++) {
- if (pthread_equal(index->tid, id)) {
- memcpy(index, ®istry[num_deferers - 1],
- sizeof(struct deferer_registry));
- registry[num_deferers - 1].tid = 0;
- registry[num_deferers - 1].defer_queue = NULL;
- registry[num_deferers - 1].last_head = 0;
- num_deferers--;
- return;
- }
- }
- /* Hrm not found, forgot to register ? */
- assert(0);
-}
-
static void start_defer_thread(void)
{
int ret;
- ret = pthread_create(&tid_defer, NULL, thr_defer,
- NULL);
+ ret = pthread_create(&tid_defer, NULL, thr_defer, NULL);
assert(!ret);
}
void rcu_defer_register_thread(void)
{
- int deferers;
+ int was_empty;
+
+ assert(defer_queue.last_head == 0);
+ assert(defer_queue.q == NULL);
+ defer_queue.q = malloc(sizeof(void *) * DEFER_QUEUE_SIZE);
internal_urcu_lock(&defer_thread_mutex);
internal_urcu_lock(&urcu_defer_mutex);
- defer_queue.q = malloc(sizeof(void *) * DEFER_QUEUE_SIZE);
- rcu_add_deferer(pthread_self());
- deferers = num_deferers;
+ was_empty = list_empty(®istry);
+ list_add(&defer_queue.list, ®istry);
internal_urcu_unlock(&urcu_defer_mutex);
- if (deferers == 1)
+ if (was_empty)
start_defer_thread();
internal_urcu_unlock(&defer_thread_mutex);
}
void rcu_defer_unregister_thread(void)
{
- int deferers;
+ int is_empty;
internal_urcu_lock(&defer_thread_mutex);
internal_urcu_lock(&urcu_defer_mutex);
- rcu_remove_deferer(pthread_self());
+ list_del(&defer_queue.list);
_rcu_defer_barrier_thread();
free(defer_queue.q);
defer_queue.q = NULL;
- deferers = num_deferers;
+ is_empty = list_empty(®istry);
internal_urcu_unlock(&urcu_defer_mutex);
- if (deferers == 0)
+ if (is_empty)
stop_defer_thread();
internal_urcu_unlock(&defer_thread_mutex);
}
void urcu_defer_exit(void)
{
- free(registry);
+ assert(list_empty(®istry));
}