* - Split-counters are used to keep track of the number of
* nodes within the hash table for automatic resize triggering.
* - Resize operation initiated by long chain detection is executed by a
- * call_rcu thread, which keeps lock-freedom of add and remove.
+ * worker thread, which keeps lock-freedom of add and remove.
* - Resize operations are protected by a mutex.
* - The removal operation is split in two parts: first, a "removed"
* flag is set in the next pointer within the node to remove. Then,
#include <unistd.h>
#include "compat-getcpu.h"
-#include <urcu-pointer.h>
-#include <urcu-call-rcu.h>
-#include <urcu-flavor.h>
+#include <urcu/pointer.h>
+#include <urcu/call-rcu.h>
+#include <urcu/flavor.h>
#include <urcu/arch.h>
#include <urcu/uatomic.h>
#include <urcu/compiler.h>
#include <urcu/rculfhash.h>
+#include <urcu/static/urcu-signal-nr.h>
#include <rculfhash-internal.h>
#include <stdio.h>
#include <pthread.h>
+#include <signal.h>
+#include "workqueue.h"
+#include "urcu-die.h"
+#include "urcu-utils.h"
/*
* Split-counters lazily update the global counter each 1024
} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
/*
- * rcu_resize_work: Contains arguments passed to RCU worker thread
+ * resize_work: Contains arguments passed to worker thread
* responsible for performing lazy resize.
*/
-struct rcu_resize_work {
- struct rcu_head head;
+struct resize_work {
+ struct urcu_work work;
struct cds_lfht *ht;
};
unsigned long start, unsigned long len);
};
+static struct urcu_workqueue *cds_lfht_workqueue;
+static unsigned long cds_lfht_workqueue_user_count;
+
+/*
+ * Mutex ensuring mutual exclusion between workqueue initialization and
+ * fork handlers. cds_lfht_fork_mutex nests inside call_rcu_mutex.
+ */
+static pthread_mutex_t cds_lfht_fork_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static struct urcu_atfork cds_lfht_atfork;
+
+/*
+ * atfork handler nesting counters. Handle being registered to many urcu
+ * flavors, thus being possibly invoked more than once in the
+ * pthread_atfork list of callbacks.
+ */
+static int cds_lfht_workqueue_atfork_nesting;
+
+static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor);
+static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor);
+
+#ifdef CONFIG_CDS_LFHT_ITER_DEBUG
+
+static
+void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+{
+ iter->lfht = ht;
+}
+
+#define cds_lfht_iter_debug_assert(...) assert(__VA_ARGS__)
+
+#else
+
+static
+void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+{
+}
+
+#define cds_lfht_iter_debug_assert(...)
+
+#endif
+
/*
* Algorithm to reverse bits in a word by lookup table, extended to
* 64-bit words.
* Returns 0 if no bit is set, else returns the position of the most
* significant bit (from 1 to 32 on 32-bit, from 1 to 64 on 64-bit).
*/
-#if defined(__i386) || defined(__x86_64)
+#if defined(URCU_ARCH_X86)
static inline
unsigned int fls_u32(uint32_t x)
{
#define HAS_FLS_U32
#endif
-#if defined(__x86_64)
+#if defined(URCU_ARCH_AMD64)
static inline
unsigned int fls_u64(uint64_t x)
{
void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
unsigned long count);
+static void mutex_lock(pthread_mutex_t *mutex)
+{
+ int ret;
+
+#ifndef DISTRUST_SIGNALS_EXTREME
+ ret = pthread_mutex_lock(mutex);
+ if (ret)
+ urcu_die(ret);
+#else /* #ifndef DISTRUST_SIGNALS_EXTREME */
+ while ((ret = pthread_mutex_trylock(mutex)) != 0) {
+ if (ret != EBUSY && ret != EINTR)
+ urcu_die(ret);
+ if (CMM_LOAD_SHARED(URCU_TLS(rcu_reader).need_mb)) {
+ cmm_smp_mb();
+ _CMM_STORE_SHARED(URCU_TLS(rcu_reader).need_mb, 0);
+ cmm_smp_mb();
+ }
+ (void) poll(NULL, 0, 10);
+ }
+#endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
+}
+
+static void mutex_unlock(pthread_mutex_t *mutex)
+{
+ int ret;
+
+ ret = pthread_mutex_unlock(mutex);
+ if (ret)
+ urcu_die(ret);
+}
+
static long nr_cpus_mask = -1;
static long split_count_mask = -1;
static int split_count_order = -1;
static
void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash)
{
- unsigned long split_count;
+ unsigned long split_count, count;
int index;
- long count;
if (caa_unlikely(!ht->split_count))
return;
if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size)
return;
- dbg_printf("add set global %ld\n", count);
+ dbg_printf("add set global %lu\n", count);
cds_lfht_resize_lazy_count(ht, size,
count >> (CHAIN_LEN_TARGET - 1));
}
static
void ht_count_del(struct cds_lfht *ht, unsigned long size, unsigned long hash)
{
- unsigned long split_count;
+ unsigned long split_count, count;
int index;
- long count;
if (caa_unlikely(!ht->split_count))
return;
}
static
-int is_removed(struct cds_lfht_node *node)
+int is_removed(const struct cds_lfht_node *node)
{
return ((unsigned long) node) & REMOVED_FLAG;
}
if (unique_ret
&& !is_bucket(next)
&& clear_flag(iter)->reverse_hash == node->reverse_hash) {
- struct cds_lfht_iter d_iter = { .node = node, .next = iter, };
+ struct cds_lfht_iter d_iter = {
+ .node = node,
+ .next = iter,
+#ifdef CONFIG_CDS_LFHT_ITER_DEBUG
+ .lfht = ht,
+#endif
+ };
/*
* uniquely adding inserts the node as the first
{
unsigned long partition_len, start = 0;
struct partition_resize_work *work;
- int thread, ret;
- unsigned long nr_threads;
+ int ret;
+ unsigned long thread, nr_threads;
assert(nr_cpus_mask != -1);
if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD)
* partition size, up to the number of CPUs in the system.
*/
if (nr_cpus_mask > 0) {
- nr_threads = min(nr_cpus_mask + 1,
+ nr_threads = min_t(unsigned long, nr_cpus_mask + 1,
len >> MIN_PARTITION_PER_THREAD_ORDER);
} else {
nr_threads = 1;
if (start == 0 && nr_threads > 0)
return;
fallback:
- ht->flavor->thread_online();
fct(ht, i, start, len);
- ht->flavor->thread_offline();
}
/*
* Holding RCU read lock to protect _cds_lfht_add against memory
- * reclaim that could be performed by other call_rcu worker threads (ABA
+ * reclaim that could be performed by other worker threads (ABA
* problem).
*
* When we reach a certain length, we can split this population phase over
/*
* Holding RCU read lock to protect _cds_lfht_remove against memory
- * reclaim that could be performed by other call_rcu worker threads (ABA
+ * reclaim that could be performed by other worker threads (ABA
* problem).
* For a single level, we logically remove and garbage collect each node.
*
*
* Concurrent removal and add operations are helping us perform garbage
* collection of logically removed nodes. We guarantee that all logically
- * removed nodes have been garbage-collected (unlinked) before call_rcu is
- * invoked to free a hole level of bucket nodes (after a grace period).
+ * removed nodes have been garbage-collected (unlinked) before work
+ * enqueue is invoked to free a hole level of bucket nodes (after a
+ * grace period).
*
* Logical removal and garbage collection can therefore be done in batch
* or on a node-per-node basis, as long as the guarantee above holds.
void fini_table(struct cds_lfht *ht,
unsigned long first_order, unsigned long last_order)
{
- long i;
- unsigned long free_by_rcu_order = 0;
+ unsigned long free_by_rcu_order = 0, i;
dbg_printf("fini table: first_order %lu last_order %lu\n",
first_order, last_order);
}
}
+/*
+ * Never called with size < 1.
+ */
static
void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size)
{
struct cds_lfht_node *prev, *node;
unsigned long order, len, i;
+ int bucket_order;
cds_lfht_alloc_bucket_table(ht, 0);
node->next = flag_bucket(get_end());
node->reverse_hash = 0;
- for (order = 1; order < cds_lfht_get_count_order_ulong(size) + 1; order++) {
+ bucket_order = cds_lfht_get_count_order_ulong(size);
+ assert(bucket_order >= 0);
+
+ for (order = 1; order < (unsigned long) bucket_order + 1; order++) {
len = 1UL << (order - 1);
cds_lfht_alloc_bucket_table(ht, order);
}
}
+#if (CAA_BITS_PER_LONG > 32)
+/*
+ * For 64-bit architectures, with max number of buckets small enough not to
+ * use the entire 64-bit memory mapping space (and allowing a fair number of
+ * hash table instances), use the mmap allocator, which is faster. Otherwise,
+ * fallback to the order allocator.
+ */
+static
+const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets)
+{
+ if (max_nr_buckets && max_nr_buckets <= (1ULL << 32))
+ return &cds_lfht_mm_mmap;
+ else
+ return &cds_lfht_mm_order;
+}
+#else
+/*
+ * For 32-bit architectures, use the order allocator.
+ */
+static
+const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets)
+{
+ return &cds_lfht_mm_order;
+}
+#endif
+
struct cds_lfht *_cds_lfht_new(unsigned long init_size,
unsigned long min_nr_alloc_buckets,
unsigned long max_nr_buckets,
/*
* Memory management plugin default.
*/
- if (!mm) {
- if (CAA_BITS_PER_LONG > 32
- && max_nr_buckets
- && max_nr_buckets <= (1ULL << 32)) {
- /*
- * For 64-bit architectures, with max number of
- * buckets small enough not to use the entire
- * 64-bit memory mapping space (and allowing a
- * fair number of hash table instances), use the
- * mmap allocator, which is faster than the
- * order allocator.
- */
- mm = &cds_lfht_mm_mmap;
- } else {
- /*
- * The fallback is to use the order allocator.
- */
- mm = &cds_lfht_mm_order;
- }
- }
+ if (!mm)
+ mm = get_mm_type(max_nr_buckets);
/* max_nr_buckets == 0 for order based mm means infinite */
if (mm == &cds_lfht_mm_order && !max_nr_buckets)
if (!max_nr_buckets || (max_nr_buckets & (max_nr_buckets - 1)))
return NULL;
+ if (flags & CDS_LFHT_AUTO_RESIZE)
+ cds_lfht_init_worker(flavor);
+
min_nr_alloc_buckets = max(min_nr_alloc_buckets, MIN_TABLE_SIZE);
init_size = max(init_size, MIN_TABLE_SIZE);
max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets);
struct cds_lfht_node *node, *next, *bucket;
unsigned long reverse_hash, size;
+ cds_lfht_iter_debug_set_ht(ht, iter);
+
reverse_hash = bit_reverse_ulong(hash);
size = rcu_dereference(ht->size);
struct cds_lfht_node *node, *next;
unsigned long reverse_hash;
+ cds_lfht_iter_debug_assert(ht == iter->lfht);
node = iter->node;
reverse_hash = node->reverse_hash;
next = iter->next;
{
struct cds_lfht_node *node, *next;
+ cds_lfht_iter_debug_assert(ht == iter->lfht);
node = clear_flag(iter->next);
for (;;) {
if (caa_unlikely(is_end(node))) {
void cds_lfht_first(struct cds_lfht *ht, struct cds_lfht_iter *iter)
{
+ cds_lfht_iter_debug_set_ht(ht, iter);
/*
* Get next after first bucket node. The first bucket node is the
* first node of the linked list.
return ret;
}
-int cds_lfht_is_node_deleted(struct cds_lfht_node *node)
+int cds_lfht_is_node_deleted(const struct cds_lfht_node *node)
{
return is_removed(CMM_LOAD_SHARED(node->next));
}
*/
int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
{
- int ret, was_online;
-
- /* Wait for in-flight resize operations to complete */
- _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
- cmm_smp_mb(); /* Store destroy before load resize */
- was_online = ht->flavor->read_ongoing();
- if (was_online)
- ht->flavor->thread_offline();
- /* Calling with RCU read-side held is an error. */
- if (ht->flavor->read_ongoing()) {
- ret = -EINVAL;
- if (was_online)
- ht->flavor->thread_online();
- goto end;
+ int ret;
+
+ if (ht->flags & CDS_LFHT_AUTO_RESIZE) {
+ /* Cancel ongoing resize operations. */
+ _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
+ /* Wait for in-flight resize operations to complete */
+ urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
}
- while (uatomic_read(&ht->in_progress_resize))
- poll(NULL, 0, 100); /* wait for 100ms */
- if (was_online)
- ht->flavor->thread_online();
ret = cds_lfht_delete_bucket(ht);
if (ret)
return ret;
ret = pthread_mutex_destroy(&ht->resize_mutex);
if (ret)
ret = -EBUSY;
+ if (ht->flags & CDS_LFHT_AUTO_RESIZE)
+ cds_lfht_fini_worker(ht->flavor);
poison_free(ht);
-end:
return ret;
}
* Resize table, re-do if the target size has changed under us.
*/
do {
- assert(uatomic_read(&ht->in_progress_resize));
if (CMM_LOAD_SHARED(ht->in_progress_destroy))
break;
ht->resize_initiated = 1;
void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
{
- int was_online;
-
- was_online = ht->flavor->read_ongoing();
- if (was_online)
- ht->flavor->thread_offline();
- /* Calling with RCU read-side held is an error. */
- if (ht->flavor->read_ongoing()) {
- static int print_once;
-
- if (!CMM_LOAD_SHARED(print_once))
- fprintf(stderr, "[error] rculfhash: cds_lfht_resize "
- "called with RCU read-side lock held.\n");
- CMM_STORE_SHARED(print_once, 1);
- assert(0);
- goto end;
- }
resize_target_update_count(ht, new_size);
CMM_STORE_SHARED(ht->resize_initiated, 1);
- pthread_mutex_lock(&ht->resize_mutex);
+ mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
- pthread_mutex_unlock(&ht->resize_mutex);
-end:
- if (was_online)
- ht->flavor->thread_online();
+ mutex_unlock(&ht->resize_mutex);
}
static
-void do_resize_cb(struct rcu_head *head)
+void do_resize_cb(struct urcu_work *work)
{
- struct rcu_resize_work *work =
- caa_container_of(head, struct rcu_resize_work, head);
- struct cds_lfht *ht = work->ht;
+ struct resize_work *resize_work =
+ caa_container_of(work, struct resize_work, work);
+ struct cds_lfht *ht = resize_work->ht;
- ht->flavor->thread_offline();
- pthread_mutex_lock(&ht->resize_mutex);
+ ht->flavor->register_thread();
+ mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
- pthread_mutex_unlock(&ht->resize_mutex);
- ht->flavor->thread_online();
+ mutex_unlock(&ht->resize_mutex);
+ ht->flavor->unregister_thread();
poison_free(work);
- cmm_smp_mb(); /* finish resize before decrement */
- uatomic_dec(&ht->in_progress_resize);
}
static
void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht)
{
- struct rcu_resize_work *work;
+ struct resize_work *work;
/* Store resize_target before read resize_initiated */
cmm_smp_mb();
if (!CMM_LOAD_SHARED(ht->resize_initiated)) {
- uatomic_inc(&ht->in_progress_resize);
- cmm_smp_mb(); /* increment resize count before load destroy */
if (CMM_LOAD_SHARED(ht->in_progress_destroy)) {
- uatomic_dec(&ht->in_progress_resize);
return;
}
work = malloc(sizeof(*work));
if (work == NULL) {
dbg_printf("error allocating resize work, bailing out\n");
- uatomic_dec(&ht->in_progress_resize);
return;
}
work->ht = ht;
- ht->flavor->update_call_rcu(&work->head, do_resize_cb);
+ urcu_workqueue_queue_work(cds_lfht_workqueue,
+ &work->work, do_resize_cb);
CMM_STORE_SHARED(ht->resize_initiated, 1);
}
}
}
__cds_lfht_resize_lazy_launch(ht);
}
+
+static void cds_lfht_before_fork(void *priv)
+{
+ if (cds_lfht_workqueue_atfork_nesting++)
+ return;
+ mutex_lock(&cds_lfht_fork_mutex);
+ if (!cds_lfht_workqueue)
+ return;
+ urcu_workqueue_pause_worker(cds_lfht_workqueue);
+}
+
+static void cds_lfht_after_fork_parent(void *priv)
+{
+ if (--cds_lfht_workqueue_atfork_nesting)
+ return;
+ if (!cds_lfht_workqueue)
+ goto end;
+ urcu_workqueue_resume_worker(cds_lfht_workqueue);
+end:
+ mutex_unlock(&cds_lfht_fork_mutex);
+}
+
+static void cds_lfht_after_fork_child(void *priv)
+{
+ if (--cds_lfht_workqueue_atfork_nesting)
+ return;
+ if (!cds_lfht_workqueue)
+ goto end;
+ urcu_workqueue_create_worker(cds_lfht_workqueue);
+end:
+ mutex_unlock(&cds_lfht_fork_mutex);
+}
+
+static struct urcu_atfork cds_lfht_atfork = {
+ .before_fork = cds_lfht_before_fork,
+ .after_fork_parent = cds_lfht_after_fork_parent,
+ .after_fork_child = cds_lfht_after_fork_child,
+};
+
+/*
+ * Block all signals for the workqueue worker thread to ensure we don't
+ * disturb the application. The SIGRCU signal needs to be unblocked for
+ * the urcu-signal flavor.
+ */
+static void cds_lfht_worker_init(struct urcu_workqueue *workqueue,
+ void *priv)
+{
+ int ret;
+ sigset_t mask;
+
+ ret = sigfillset(&mask);
+ if (ret)
+ urcu_die(errno);
+ ret = sigdelset(&mask, SIGRCU);
+ if (ret)
+ urcu_die(errno);
+ ret = pthread_sigmask(SIG_SETMASK, &mask, NULL);
+ if (ret)
+ urcu_die(ret);
+}
+
+static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor)
+{
+ flavor->register_rculfhash_atfork(&cds_lfht_atfork);
+
+ mutex_lock(&cds_lfht_fork_mutex);
+ if (cds_lfht_workqueue_user_count++)
+ goto end;
+ cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
+ NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL);
+end:
+ mutex_unlock(&cds_lfht_fork_mutex);
+}
+
+static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor)
+{
+ mutex_lock(&cds_lfht_fork_mutex);
+ if (--cds_lfht_workqueue_user_count)
+ goto end;
+ urcu_workqueue_destroy(cds_lfht_workqueue);
+ cds_lfht_workqueue = NULL;
+end:
+ mutex_unlock(&cds_lfht_fork_mutex);
+
+ flavor->unregister_rculfhash_atfork(&cds_lfht_atfork);
+}