+// SPDX-FileCopyrightText: 2010-2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+// SPDX-FileCopyrightText: 2011 Lai Jiangshan <laijs@cn.fujitsu.com>
+//
+// SPDX-License-Identifier: LGPL-2.1-or-later
+
/*
- * rculfhash.c
- *
* Userspace RCU library - Lock-Free Resizable RCU Hash Table
- *
- * Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- * Copyright 2011 - Lai Jiangshan <laijs@cn.fujitsu.com>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
#define _LGPL_SOURCE
#include <stdlib.h>
#include <errno.h>
-#include <assert.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "compat-getcpu.h"
+#include <urcu/assert.h>
#include <urcu/pointer.h>
#include <urcu/call-rcu.h>
#include <urcu/flavor.h>
#include <urcu/uatomic.h>
#include <urcu/compiler.h>
#include <urcu/rculfhash.h>
-#include <rculfhash-internal.h>
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
+#include "rculfhash-internal.h"
#include "workqueue.h"
#include "urcu-die.h"
+#include "urcu-utils.h"
+#include "compat-smp.h"
/*
* Split-counters lazily update the global counter each 1024
unsigned long start, unsigned long len);
};
+enum nr_cpus_mask_state {
+ NR_CPUS_MASK_INIT_FAILED = -2,
+ NR_CPUS_MASK_UNINITIALIZED = -1,
+};
+
static struct urcu_workqueue *cds_lfht_workqueue;
-static unsigned long cds_lfht_workqueue_user_count;
/*
* Mutex ensuring mutual exclusion between workqueue initialization and
*/
static int cds_lfht_workqueue_atfork_nesting;
+static void __attribute__((destructor)) cds_lfht_exit(void);
static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor);
-static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor);
#ifdef CONFIG_CDS_LFHT_ITER_DEBUG
iter->lfht = ht;
}
-#define cds_lfht_iter_debug_assert(...) assert(__VA_ARGS__)
+#define cds_lfht_iter_debug_assert(...) urcu_posix_assert(__VA_ARGS__)
#else
static
-void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht __attribute__((unused)),
+ struct cds_lfht_iter *iter __attribute__((unused)))
{
}
* Returns 0 if no bit is set, else returns the position of the most
* significant bit (from 1 to 32 on 32-bit, from 1 to 64 on 64-bit).
*/
-#if defined(__i386) || defined(__x86_64)
+#if defined(URCU_ARCH_X86)
static inline
unsigned int fls_u32(uint32_t x)
{
#define HAS_FLS_U32
#endif
-#if defined(__x86_64)
+#if defined(URCU_ARCH_AMD64)
static inline
unsigned int fls_u64(uint64_t x)
{
* Return the minimum order for which x <= (1UL << order).
* Return -1 if x is 0.
*/
+static
int cds_lfht_get_count_order_u32(uint32_t x)
{
if (!x)
if (ret != EBUSY && ret != EINTR)
urcu_die(ret);
if (CMM_LOAD_SHARED(URCU_TLS(rcu_reader).need_mb)) {
- cmm_smp_mb();
- _CMM_STORE_SHARED(URCU_TLS(rcu_reader).need_mb, 0);
- cmm_smp_mb();
+ uatomic_store(&URCU_TLS(rcu_reader).need_mb, 0, CMM_SEQ_CST);
}
(void) poll(NULL, 0, 10);
}
urcu_die(ret);
}
-static long nr_cpus_mask = -1;
+static long nr_cpus_mask = NR_CPUS_MASK_UNINITIALIZED;
static long split_count_mask = -1;
static int split_count_order = -1;
-#if defined(HAVE_SYSCONF)
static void ht_init_nr_cpus_mask(void)
{
long maxcpus;
- maxcpus = sysconf(_SC_NPROCESSORS_CONF);
+ maxcpus = get_possible_cpus_array_len();
if (maxcpus <= 0) {
- nr_cpus_mask = -2;
+ nr_cpus_mask = NR_CPUS_MASK_INIT_FAILED;
return;
}
/*
maxcpus = 1UL << cds_lfht_get_count_order_ulong(maxcpus);
nr_cpus_mask = maxcpus - 1;
}
-#else /* #if defined(HAVE_SYSCONF) */
-static void ht_init_nr_cpus_mask(void)
-{
- nr_cpus_mask = -2;
-}
-#endif /* #else #if defined(HAVE_SYSCONF) */
static
void alloc_split_items_count(struct cds_lfht *ht)
{
- if (nr_cpus_mask == -1) {
+ if (nr_cpus_mask == NR_CPUS_MASK_UNINITIALIZED) {
ht_init_nr_cpus_mask();
if (nr_cpus_mask < 0)
split_count_mask = DEFAULT_SPLIT_COUNT_MASK;
cds_lfht_get_count_order_ulong(split_count_mask + 1);
}
- assert(split_count_mask >= 0);
+ urcu_posix_assert(split_count_mask >= 0);
if (ht->flags & CDS_LFHT_ACCOUNTING) {
ht->split_count = calloc(split_count_mask + 1,
sizeof(struct ht_items_count));
- assert(ht->split_count);
+ urcu_posix_assert(ht->split_count);
} else {
ht->split_count = NULL;
}
{
int cpu;
- assert(split_count_mask >= 0);
+ urcu_posix_assert(split_count_mask >= 0);
cpu = urcu_sched_getcpu();
if (caa_unlikely(cpu < 0))
return hash & split_count_mask;
static
void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash)
{
- unsigned long split_count;
+ unsigned long split_count, count;
int index;
- long count;
if (caa_unlikely(!ht->split_count))
return;
if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size)
return;
- dbg_printf("add set global %ld\n", count);
+ dbg_printf("add set global %lu\n", count);
cds_lfht_resize_lazy_count(ht, size,
count >> (CHAIN_LEN_TARGET - 1));
}
static
void ht_count_del(struct cds_lfht *ht, unsigned long size, unsigned long hash)
{
- unsigned long split_count;
+ unsigned long split_count, count;
int index;
- long count;
if (caa_unlikely(!ht->split_count))
return;
if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size)
return;
- dbg_printf("del set global %ld\n", count);
+ dbg_printf("del set global %lu\n", count);
/*
* Don't shrink table if the number of nodes is below a
* certain threshold.
}
static
-int is_removed(struct cds_lfht_node *node)
+int is_removed(const struct cds_lfht_node *node)
{
return ((unsigned long) node) & REMOVED_FLAG;
}
return ((unsigned long) node) & REMOVAL_OWNER_FLAG;
}
+static
+struct cds_lfht_node *flag_removed(struct cds_lfht_node *node)
+{
+ return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG);
+}
+
static
struct cds_lfht_node *flag_removal_owner(struct cds_lfht_node *node)
{
old1 = uatomic_read(ptr);
do {
old2 = old1;
- if (old2 >= v)
+ if (old2 >= v) {
+ cmm_smp_mb();
return old2;
+ }
} while ((old1 = uatomic_cmpxchg(ptr, old2, v)) != old2);
return old2;
}
struct cds_lfht_node *lookup_bucket(struct cds_lfht *ht, unsigned long size,
unsigned long hash)
{
- assert(size > 0);
+ urcu_posix_assert(size > 0);
return bucket_at(ht, hash & (size - 1));
}
{
struct cds_lfht_node *iter_prev, *iter, *next, *new_next;
- assert(!is_bucket(bucket));
- assert(!is_removed(bucket));
- assert(!is_removal_owner(bucket));
- assert(!is_bucket(node));
- assert(!is_removed(node));
- assert(!is_removal_owner(node));
+ urcu_posix_assert(!is_bucket(bucket));
+ urcu_posix_assert(!is_removed(bucket));
+ urcu_posix_assert(!is_removal_owner(bucket));
+ urcu_posix_assert(!is_bucket(node));
+ urcu_posix_assert(!is_removed(node));
+ urcu_posix_assert(!is_removal_owner(node));
for (;;) {
iter_prev = bucket;
/* We can always skip the bucket node initially */
iter = rcu_dereference(iter_prev->next);
- assert(!is_removed(iter));
- assert(!is_removal_owner(iter));
- assert(iter_prev->reverse_hash <= node->reverse_hash);
+ urcu_posix_assert(!is_removed(iter));
+ urcu_posix_assert(!is_removal_owner(iter));
+ urcu_posix_assert(iter_prev->reverse_hash <= node->reverse_hash);
/*
* We should never be called with bucket (start of chain)
* and logically removed node (end of path compression
* marker) being the actual same node. This would be a
* bug in the algorithm implementation.
*/
- assert(bucket != node);
+ urcu_posix_assert(bucket != node);
for (;;) {
if (caa_unlikely(is_end(iter)))
return;
iter_prev = clear_flag(iter);
iter = next;
}
- assert(!is_removed(iter));
- assert(!is_removal_owner(iter));
+ urcu_posix_assert(!is_removed(iter));
+ urcu_posix_assert(!is_removal_owner(iter));
if (is_bucket(iter))
new_next = flag_bucket(clear_flag(next));
else
if (!old_node) /* Return -ENOENT if asked to replace NULL node */
return -ENOENT;
- assert(!is_removed(old_node));
- assert(!is_removal_owner(old_node));
- assert(!is_bucket(old_node));
- assert(!is_removed(new_node));
- assert(!is_removal_owner(new_node));
- assert(!is_bucket(new_node));
- assert(new_node != old_node);
+ urcu_posix_assert(!is_removed(old_node));
+ urcu_posix_assert(!is_removal_owner(old_node));
+ urcu_posix_assert(!is_bucket(old_node));
+ urcu_posix_assert(!is_removed(new_node));
+ urcu_posix_assert(!is_removal_owner(new_node));
+ urcu_posix_assert(!is_bucket(new_node));
+ urcu_posix_assert(new_node != old_node);
for (;;) {
/* Insert after node to be replaced */
if (is_removed(old_next)) {
*/
return -ENOENT;
}
- assert(old_next == clear_flag(old_next));
- assert(new_node != old_next);
+ urcu_posix_assert(old_next == clear_flag(old_next));
+ urcu_posix_assert(new_node != old_next);
/*
* REMOVAL_OWNER flag is _NEVER_ set before the REMOVED
* flag. It is either set atomically at the same time
* (replace) or after (del).
*/
- assert(!is_removal_owner(old_next));
+ urcu_posix_assert(!is_removal_owner(old_next));
new_node->next = old_next;
/*
* Here is the whole trick for lock-free replace: we add
bucket = lookup_bucket(ht, size, bit_reverse_ulong(old_node->reverse_hash));
_cds_lfht_gc_bucket(bucket, new_node);
- assert(is_removed(CMM_LOAD_SHARED(old_node->next)));
+ urcu_posix_assert(is_removed(CMM_LOAD_SHARED(old_node->next)));
return 0;
}
*return_node;
struct cds_lfht_node *bucket;
- assert(!is_bucket(node));
- assert(!is_removed(node));
- assert(!is_removal_owner(node));
+ urcu_posix_assert(!is_bucket(node));
+ urcu_posix_assert(!is_removed(node));
+ urcu_posix_assert(!is_removal_owner(node));
bucket = lookup_bucket(ht, size, hash);
for (;;) {
uint32_t chain_len = 0;
iter_prev = bucket;
/* We can always skip the bucket node initially */
iter = rcu_dereference(iter_prev->next);
- assert(iter_prev->reverse_hash <= node->reverse_hash);
+ urcu_posix_assert(iter_prev->reverse_hash <= node->reverse_hash);
for (;;) {
if (caa_unlikely(is_end(iter)))
goto insert;
}
insert:
- assert(node != clear_flag(iter));
- assert(!is_removed(iter_prev));
- assert(!is_removal_owner(iter_prev));
- assert(!is_removed(iter));
- assert(!is_removal_owner(iter));
- assert(iter_prev != node);
+ urcu_posix_assert(node != clear_flag(iter));
+ urcu_posix_assert(!is_removed(iter_prev));
+ urcu_posix_assert(!is_removal_owner(iter_prev));
+ urcu_posix_assert(!is_removed(iter));
+ urcu_posix_assert(!is_removal_owner(iter));
+ urcu_posix_assert(iter_prev != node);
if (!bucket_flag)
node->next = clear_flag(iter);
else
}
gc_node:
- assert(!is_removed(iter));
- assert(!is_removal_owner(iter));
+ urcu_posix_assert(!is_removed(iter));
+ urcu_posix_assert(!is_removal_owner(iter));
if (is_bucket(iter))
new_next = flag_bucket(clear_flag(next));
else
struct cds_lfht_node *node)
{
struct cds_lfht_node *bucket, *next;
+ uintptr_t *node_next;
if (!node) /* Return -ENOENT if asked to delete NULL node */
return -ENOENT;
/* logically delete the node */
- assert(!is_bucket(node));
- assert(!is_removed(node));
- assert(!is_removal_owner(node));
+ urcu_posix_assert(!is_bucket(node));
+ urcu_posix_assert(!is_removed(node));
+ urcu_posix_assert(!is_removal_owner(node));
/*
* We are first checking if the node had previously been
next = CMM_LOAD_SHARED(node->next); /* next is not dereferenced */
if (caa_unlikely(is_removed(next)))
return -ENOENT;
- assert(!is_bucket(next));
+ urcu_posix_assert(!is_bucket(next));
/*
* The del operation semantic guarantees a full memory barrier
* before the uatomic_or atomic commit of the deletion flag.
- */
- cmm_smp_mb__before_uatomic_or();
- /*
+ *
* We set the REMOVED_FLAG unconditionally. Note that there may
* be more than one concurrent thread setting this flag.
* Knowing which wins the race will be known after the garbage
* collection phase, stay tuned!
+ *
+ * NOTE: The node_next variable is present to avoid breaking
+ * strict-aliasing rules.
*/
- uatomic_or(&node->next, REMOVED_FLAG);
+ node_next = (uintptr_t*)&node->next;
+ uatomic_or_mo(node_next, REMOVED_FLAG, CMM_RELEASE);
+
/* We performed the (logical) deletion. */
/*
bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash));
_cds_lfht_gc_bucket(bucket, node);
- assert(is_removed(CMM_LOAD_SHARED(node->next)));
+ urcu_posix_assert(is_removed(CMM_LOAD_SHARED(node->next)));
/*
* Last phase: atomically exchange node->next with a version
* having "REMOVAL_OWNER_FLAG" set. If the returned node->next
* was already set).
*/
if (!is_removal_owner(uatomic_xchg(&node->next,
- flag_removal_owner(node->next))))
+ flag_removal_owner(uatomic_load(&node->next, CMM_RELAXED)))))
return 0;
else
return -ENOENT;
{
unsigned long partition_len, start = 0;
struct partition_resize_work *work;
- int thread, ret;
- unsigned long nr_threads;
+ int ret;
+ unsigned long thread, nr_threads;
+ sigset_t newmask, oldmask;
- assert(nr_cpus_mask != -1);
+ urcu_posix_assert(nr_cpus_mask != NR_CPUS_MASK_UNINITIALIZED);
if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD)
goto fallback;
* partition size, up to the number of CPUs in the system.
*/
if (nr_cpus_mask > 0) {
- nr_threads = min(nr_cpus_mask + 1,
+ nr_threads = min_t(unsigned long, nr_cpus_mask + 1,
len >> MIN_PARTITION_PER_THREAD_ORDER);
} else {
nr_threads = 1;
dbg_printf("error allocating for resize, single-threading\n");
goto fallback;
}
+
+ ret = sigfillset(&newmask);
+ urcu_posix_assert(!ret);
+ ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+ urcu_posix_assert(!ret);
+
for (thread = 0; thread < nr_threads; thread++) {
work[thread].ht = ht;
work[thread].i = i;
work[thread].len = partition_len;
work[thread].start = thread * partition_len;
work[thread].fct = fct;
- ret = pthread_create(&(work[thread].thread_id), ht->resize_attr,
+ ret = pthread_create(&(work[thread].thread_id),
+ ht->caller_resize_attr ? &ht->resize_attr : NULL,
partition_resize_thread, &work[thread]);
if (ret == EAGAIN) {
/*
nr_threads = thread;
break;
}
- assert(!ret);
+ urcu_posix_assert(!ret);
}
+
+ ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+ urcu_posix_assert(!ret);
+
for (thread = 0; thread < nr_threads; thread++) {
ret = pthread_join(work[thread].thread_id, NULL);
- assert(!ret);
+ urcu_posix_assert(!ret);
}
free(work);
{
unsigned long j, size = 1UL << (i - 1);
- assert(i > MIN_TABLE_ORDER);
+ urcu_posix_assert(i > MIN_TABLE_ORDER);
ht->flavor->read_lock();
for (j = size + start; j < size + start + len; j++) {
struct cds_lfht_node *new_node = bucket_at(ht, j);
- assert(j >= size && j < (size << 1));
+ urcu_posix_assert(j >= size && j < (size << 1));
dbg_printf("init populate: order %lu index %lu hash %lu\n",
i, j, j);
new_node->reverse_hash = bit_reverse_ulong(j);
dbg_printf("init table: first_order %lu last_order %lu\n",
first_order, last_order);
- assert(first_order > MIN_TABLE_ORDER);
+ urcu_posix_assert(first_order > MIN_TABLE_ORDER);
for (i = first_order; i <= last_order; i++) {
unsigned long len;
/*
* Update table size.
+ *
+ * Populate data before RCU size.
*/
- cmm_smp_wmb(); /* populate data before RCU size */
- CMM_STORE_SHARED(ht->size, 1UL << i);
+ uatomic_store(&ht->size, 1UL << i, CMM_RELEASE);
dbg_printf("init new size: %lu\n", 1UL << i);
if (CMM_LOAD_SHARED(ht->in_progress_destroy))
{
unsigned long j, size = 1UL << (i - 1);
- assert(i > MIN_TABLE_ORDER);
+ urcu_posix_assert(i > MIN_TABLE_ORDER);
ht->flavor->read_lock();
for (j = size + start; j < size + start + len; j++) {
struct cds_lfht_node *fini_bucket = bucket_at(ht, j);
struct cds_lfht_node *parent_bucket = bucket_at(ht, j - size);
+ uintptr_t *fini_bucket_next;
- assert(j >= size && j < (size << 1));
+ urcu_posix_assert(j >= size && j < (size << 1));
dbg_printf("remove entry: order %lu index %lu hash %lu\n",
i, j, j);
- /* Set the REMOVED_FLAG to freeze the ->next for gc */
- uatomic_or(&fini_bucket->next, REMOVED_FLAG);
+ /* Set the REMOVED_FLAG to freeze the ->next for gc.
+ *
+ * NOTE: The fini_bucket_next variable is present to
+ * avoid breaking strict-aliasing rules.
+ */
+ fini_bucket_next = (uintptr_t*)&fini_bucket->next;
+ uatomic_or(fini_bucket_next, REMOVED_FLAG);
_cds_lfht_gc_bucket(parent_bucket, fini_bucket);
}
ht->flavor->read_unlock();
void fini_table(struct cds_lfht *ht,
unsigned long first_order, unsigned long last_order)
{
- long i;
- unsigned long free_by_rcu_order = 0;
+ unsigned long free_by_rcu_order = 0, i;
dbg_printf("fini table: first_order %lu last_order %lu\n",
first_order, last_order);
- assert(first_order > MIN_TABLE_ORDER);
+ urcu_posix_assert(first_order > MIN_TABLE_ORDER);
for (i = last_order; i >= first_order; i--) {
unsigned long len;
}
}
+/*
+ * Never called with size < 1.
+ */
static
void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size)
{
struct cds_lfht_node *prev, *node;
unsigned long order, len, i;
+ int bucket_order;
cds_lfht_alloc_bucket_table(ht, 0);
node->next = flag_bucket(get_end());
node->reverse_hash = 0;
- for (order = 1; order < cds_lfht_get_count_order_ulong(size) + 1; order++) {
+ bucket_order = cds_lfht_get_count_order_ulong(size);
+ urcu_posix_assert(bucket_order >= 0);
+
+ for (order = 1; order < (unsigned long) bucket_order + 1; order++) {
len = 1UL << (order - 1);
cds_lfht_alloc_bucket_table(ht, order);
node->reverse_hash = bit_reverse_ulong(len + i);
/* insert after prev */
- assert(is_bucket(prev->next));
+ urcu_posix_assert(is_bucket(prev->next));
node->next = prev->next;
prev->next = flag_bucket(node);
}
}
}
+#if (CAA_BITS_PER_LONG > 32)
+/*
+ * For 64-bit architectures, with max number of buckets small enough not to
+ * use the entire 64-bit memory mapping space (and allowing a fair number of
+ * hash table instances), use the mmap allocator, which is faster. Otherwise,
+ * fallback to the order allocator.
+ */
+static
+const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets)
+{
+ if (max_nr_buckets && max_nr_buckets <= (1ULL << 32))
+ return &cds_lfht_mm_mmap;
+ else
+ return &cds_lfht_mm_order;
+}
+#else
+/*
+ * For 32-bit architectures, use the order allocator.
+ */
+static
+const struct cds_lfht_mm_type *get_mm_type(
+ unsigned long max_nr_buckets __attribute__((unused)))
+{
+ return &cds_lfht_mm_order;
+}
+#endif
+
+void cds_lfht_node_init_deleted(struct cds_lfht_node *node)
+{
+ cds_lfht_node_init(node);
+ node->next = flag_removed(NULL);
+}
+
struct cds_lfht *_cds_lfht_new(unsigned long init_size,
unsigned long min_nr_alloc_buckets,
unsigned long max_nr_buckets,
/*
* Memory management plugin default.
*/
- if (!mm) {
- if (CAA_BITS_PER_LONG > 32
- && max_nr_buckets
- && max_nr_buckets <= (1ULL << 32)) {
- /*
- * For 64-bit architectures, with max number of
- * buckets small enough not to use the entire
- * 64-bit memory mapping space (and allowing a
- * fair number of hash table instances), use the
- * mmap allocator, which is faster than the
- * order allocator.
- */
- mm = &cds_lfht_mm_mmap;
- } else {
- /*
- * The fallback is to use the order allocator.
- */
- mm = &cds_lfht_mm_order;
- }
- }
+ if (!mm)
+ mm = get_mm_type(max_nr_buckets);
/* max_nr_buckets == 0 for order based mm means infinite */
if (mm == &cds_lfht_mm_order && !max_nr_buckets)
init_size = min(init_size, max_nr_buckets);
ht = mm->alloc_cds_lfht(min_nr_alloc_buckets, max_nr_buckets);
- assert(ht);
- assert(ht->mm == mm);
- assert(ht->bucket_at == mm->bucket_at);
+ urcu_posix_assert(ht);
+ urcu_posix_assert(ht->mm == mm);
+ urcu_posix_assert(ht->bucket_at == mm->bucket_at);
ht->flags = flags;
ht->flavor = flavor;
- ht->resize_attr = attr;
+ ht->caller_resize_attr = attr;
+ if (attr)
+ ht->resize_attr = *attr;
alloc_split_items_count(ht);
/* this mutex should not nest in read-side C.S. */
pthread_mutex_init(&ht->resize_mutex, NULL);
reverse_hash = bit_reverse_ulong(hash);
- size = rcu_dereference(ht->size);
+ /*
+ * Use load acquire instead of rcu_dereference because there is no
+ * dependency between the table size and the dereference of the bucket
+ * content.
+ *
+ * This acquire is paired with the store release in init_table().
+ */
+ size = uatomic_load(&ht->size, CMM_ACQUIRE);
bucket = lookup_bucket(ht, size, hash);
/* We can always skip the bucket node initially */
node = rcu_dereference(bucket->next);
break;
}
next = rcu_dereference(node->next);
- assert(node == clear_flag(node));
+ urcu_posix_assert(node == clear_flag(node));
if (caa_likely(!is_removed(next))
&& !is_bucket(next)
&& node->reverse_hash == reverse_hash
}
node = clear_flag(next);
}
- assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
+ urcu_posix_assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
iter->node = node;
iter->next = next;
}
-void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match,
+void cds_lfht_next_duplicate(struct cds_lfht *ht __attribute__((unused)),
+ cds_lfht_match_fct match,
const void *key, struct cds_lfht_iter *iter)
{
struct cds_lfht_node *node, *next;
}
node = clear_flag(next);
}
- assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
+ urcu_posix_assert(!node || !is_bucket(uatomic_load(&node->next, CMM_RELAXED)));
iter->node = node;
iter->next = next;
}
-void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+void cds_lfht_next(struct cds_lfht *ht __attribute__((unused)),
+ struct cds_lfht_iter *iter)
{
struct cds_lfht_node *node, *next;
}
node = clear_flag(next);
}
- assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
+ urcu_posix_assert(!node || !is_bucket(uatomic_load(&node->next, CMM_RELAXED)));
iter->node = node;
iter->next = next;
}
* Get next after first bucket node. The first bucket node is the
* first node of the linked list.
*/
- iter->next = bucket_at(ht, 0)->next;
+ iter->next = uatomic_load(&bucket_at(ht, 0)->next, CMM_CONSUME);
cds_lfht_next(ht, iter);
}
unsigned long size;
node->reverse_hash = bit_reverse_ulong(hash);
- size = rcu_dereference(ht->size);
+ size = uatomic_load(&ht->size, CMM_ACQUIRE);
_cds_lfht_add(ht, hash, NULL, NULL, size, node, NULL, 0);
ht_count_add(ht, size, hash);
}
struct cds_lfht_iter iter;
node->reverse_hash = bit_reverse_ulong(hash);
- size = rcu_dereference(ht->size);
+ size = uatomic_load(&ht->size, CMM_ACQUIRE);
_cds_lfht_add(ht, hash, match, key, size, node, &iter, 0);
if (iter.node == node)
ht_count_add(ht, size, hash);
struct cds_lfht_iter iter;
node->reverse_hash = bit_reverse_ulong(hash);
- size = rcu_dereference(ht->size);
+ size = uatomic_load(&ht->size, CMM_ACQUIRE);
for (;;) {
_cds_lfht_add(ht, hash, match, key, size, node, &iter, 0);
if (iter.node == node) {
return -EINVAL;
if (caa_unlikely(!match(old_iter->node, key)))
return -EINVAL;
- size = rcu_dereference(ht->size);
+ size = uatomic_load(&ht->size, CMM_ACQUIRE);
return _cds_lfht_replace(ht, size, old_iter->node, old_iter->next,
new_node);
}
unsigned long size;
int ret;
- size = rcu_dereference(ht->size);
+ size = uatomic_load(&ht->size, CMM_ACQUIRE);
ret = _cds_lfht_del(ht, size, node);
if (!ret) {
unsigned long hash;
return ret;
}
-int cds_lfht_is_node_deleted(struct cds_lfht_node *node)
+int cds_lfht_is_node_deleted(const struct cds_lfht_node *node)
{
return is_removed(CMM_LOAD_SHARED(node->next));
}
+static
+bool cds_lfht_is_empty(struct cds_lfht *ht)
+{
+ struct cds_lfht_node *node, *next;
+ bool empty = true;
+ bool was_online;
+
+ was_online = ht->flavor->read_ongoing();
+ if (!was_online) {
+ ht->flavor->thread_online();
+ ht->flavor->read_lock();
+ }
+ /* Check that the table is empty */
+ node = bucket_at(ht, 0);
+ do {
+ next = rcu_dereference(node->next);
+ if (!is_bucket(next)) {
+ empty = false;
+ break;
+ }
+ node = clear_flag(next);
+ } while (!is_end(node));
+ if (!was_online) {
+ ht->flavor->read_unlock();
+ ht->flavor->thread_offline();
+ }
+ return empty;
+}
+
static
int cds_lfht_delete_bucket(struct cds_lfht *ht)
{
node = clear_flag(node)->next;
if (!is_bucket(node))
return -EPERM;
- assert(!is_removed(node));
- assert(!is_removal_owner(node));
+ urcu_posix_assert(!is_removed(node));
+ urcu_posix_assert(!is_removal_owner(node));
} while (!is_end(node));
/*
* size accessed without rcu_dereference because hash table is
node = bucket_at(ht, i);
dbg_printf("delete bucket: index %lu expected hash %lu hash %lu\n",
i, i, bit_reverse_ulong(node->reverse_hash));
- assert(is_bucket(node->next));
+ urcu_posix_assert(is_bucket(node->next));
}
for (order = cds_lfht_get_count_order_ulong(size); (long)order >= 0; order--)
return 0;
}
+static
+void do_auto_resize_destroy_cb(struct urcu_work *work)
+{
+ struct cds_lfht *ht = caa_container_of(work, struct cds_lfht, destroy_work);
+ int ret;
+
+ ht->flavor->register_thread();
+ ret = cds_lfht_delete_bucket(ht);
+ if (ret)
+ urcu_die(-ret);
+ free_split_items_count(ht);
+ ret = pthread_mutex_destroy(&ht->resize_mutex);
+ if (ret)
+ urcu_die(ret);
+ ht->flavor->unregister_thread();
+ poison_free(ht);
+}
+
/*
* Should only be called when no more concurrent readers nor writers can
* possibly access the table.
int ret;
if (ht->flags & CDS_LFHT_AUTO_RESIZE) {
+ /*
+ * Perform error-checking for emptiness before queuing
+ * work, so we can return error to the caller. This runs
+ * concurrently with ongoing resize.
+ */
+ if (!cds_lfht_is_empty(ht))
+ return -EPERM;
/* Cancel ongoing resize operations. */
- _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
- /* Wait for in-flight resize operations to complete */
- urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
+ uatomic_store(&ht->in_progress_destroy, 1, CMM_RELAXED);
+ if (attr) {
+ *attr = ht->caller_resize_attr;
+ ht->caller_resize_attr = NULL;
+ }
+ /*
+ * Queue destroy work after prior queued resize
+ * operations. Given there are no concurrent writers
+ * accessing the hash table at this point, no resize
+ * operations can be queued after this destroy work.
+ */
+ urcu_workqueue_queue_work(cds_lfht_workqueue,
+ &ht->destroy_work, do_auto_resize_destroy_cb);
+ return 0;
}
ret = cds_lfht_delete_bucket(ht);
if (ret)
return ret;
free_split_items_count(ht);
if (attr)
- *attr = ht->resize_attr;
+ *attr = ht->caller_resize_attr;
ret = pthread_mutex_destroy(&ht->resize_mutex);
if (ret)
ret = -EBUSY;
- if (ht->flags & CDS_LFHT_AUTO_RESIZE)
- cds_lfht_fini_worker(ht->flavor);
poison_free(ht);
return ret;
}
new_order = cds_lfht_get_count_order_ulong(new_size);
dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n",
old_size, old_order, new_size, new_order);
- assert(new_size > old_size);
+ urcu_posix_assert(new_size > old_size);
init_table(ht, old_order + 1, new_order);
}
new_order = cds_lfht_get_count_order_ulong(new_size);
dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n",
old_size, old_order, new_size, new_order);
- assert(new_size < old_size);
+ urcu_posix_assert(new_size < old_size);
/* Remove and unlink all bucket nodes to remove. */
fini_table(ht, new_order + 1, old_order);
* Resize table, re-do if the target size has changed under us.
*/
do {
- if (CMM_LOAD_SHARED(ht->in_progress_destroy))
+ if (uatomic_load(&ht->in_progress_destroy, CMM_RELAXED))
break;
- ht->resize_initiated = 1;
+
+ uatomic_store(&ht->resize_initiated, 1, CMM_RELAXED);
+
old_size = ht->size;
- new_size = CMM_LOAD_SHARED(ht->resize_target);
+ new_size = uatomic_load(&ht->resize_target, CMM_RELAXED);
if (old_size < new_size)
_do_cds_lfht_grow(ht, old_size, new_size);
else if (old_size > new_size)
_do_cds_lfht_shrink(ht, old_size, new_size);
- ht->resize_initiated = 0;
+
+ uatomic_store(&ht->resize_initiated, 0, CMM_RELAXED);
/* write resize_initiated before read resize_target */
cmm_smp_mb();
- } while (ht->size != CMM_LOAD_SHARED(ht->resize_target));
+ } while (ht->size != uatomic_load(&ht->resize_target, CMM_RELAXED));
}
static
void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
{
resize_target_update_count(ht, new_size);
- CMM_STORE_SHARED(ht->resize_initiated, 1);
+
+ /*
+ * Set flags has early as possible even in contention case.
+ */
+ uatomic_store(&ht->resize_initiated, 1, CMM_RELAXED);
+
mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
mutex_unlock(&ht->resize_mutex);
{
struct resize_work *work;
- /* Store resize_target before read resize_initiated */
- cmm_smp_mb();
- if (!CMM_LOAD_SHARED(ht->resize_initiated)) {
- if (CMM_LOAD_SHARED(ht->in_progress_destroy)) {
+ /*
+ * Store to resize_target is before read resize_initiated as guaranteed
+ * by either cmpxchg or _uatomic_xchg_monotonic_increase.
+ */
+ if (!uatomic_load(&ht->resize_initiated, CMM_RELAXED)) {
+ if (uatomic_load(&ht->in_progress_destroy, CMM_RELAXED)) {
return;
}
work = malloc(sizeof(*work));
work->ht = ht;
urcu_workqueue_queue_work(cds_lfht_workqueue,
&work->work, do_resize_cb);
- CMM_STORE_SHARED(ht->resize_initiated, 1);
+ uatomic_store(&ht->resize_initiated, 1, CMM_RELAXED);
}
}
__cds_lfht_resize_lazy_launch(ht);
}
-static void cds_lfht_before_fork(void *priv)
+static void cds_lfht_before_fork(void *priv __attribute__((unused)))
{
if (cds_lfht_workqueue_atfork_nesting++)
return;
urcu_workqueue_pause_worker(cds_lfht_workqueue);
}
-static void cds_lfht_after_fork_parent(void *priv)
+static void cds_lfht_after_fork_parent(void *priv __attribute__((unused)))
{
if (--cds_lfht_workqueue_atfork_nesting)
return;
mutex_unlock(&cds_lfht_fork_mutex);
}
-static void cds_lfht_after_fork_child(void *priv)
+static void cds_lfht_after_fork_child(void *priv __attribute__((unused)))
{
if (--cds_lfht_workqueue_atfork_nesting)
return;
.after_fork_child = cds_lfht_after_fork_child,
};
-/* Block all signals to ensure we don't disturb the application. */
-static void cds_lfht_worker_init(struct urcu_workqueue *workqueue,
- void *priv)
-{
- int ret;
- sigset_t mask;
-
- /* Block signal for entire process, so only our thread processes it. */
- ret = sigfillset(&mask);
- if (ret)
- urcu_die(errno);
- ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
- if (ret)
- urcu_die(ret);
-}
-
static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor)
{
flavor->register_rculfhash_atfork(&cds_lfht_atfork);
mutex_lock(&cds_lfht_fork_mutex);
- if (cds_lfht_workqueue_user_count++)
- goto end;
- cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
- NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL);
-end:
+ if (!cds_lfht_workqueue)
+ cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
+ NULL, NULL, NULL, NULL, NULL, NULL, NULL);
mutex_unlock(&cds_lfht_fork_mutex);
}
-static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor)
+static void cds_lfht_exit(void)
{
mutex_lock(&cds_lfht_fork_mutex);
- if (--cds_lfht_workqueue_user_count)
- goto end;
- urcu_workqueue_destroy(cds_lfht_workqueue);
- cds_lfht_workqueue = NULL;
-end:
+ if (cds_lfht_workqueue) {
+ urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
+ urcu_workqueue_destroy(cds_lfht_workqueue);
+ cds_lfht_workqueue = NULL;
+ }
mutex_unlock(&cds_lfht_fork_mutex);
-
- flavor->unregister_rculfhash_atfork(&cds_lfht_atfork);
}