X-Git-Url: https://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=src%2Frculfhash.c;h=04fd49946aa940184015a4921971feff5d3818e5;hp=d7a1f23bf8f766040341acbd18584246b5c327e4;hb=014775106c60f02818ca755b331f887030bd440f;hpb=6893800a4d1cc14dff0395ddcd660a5138db183d diff --git a/src/rculfhash.c b/src/rculfhash.c index d7a1f23..04fd499 100644 --- a/src/rculfhash.c +++ b/src/rculfhash.c @@ -64,7 +64,7 @@ * - Split-counters are used to keep track of the number of * nodes within the hash table for automatic resize triggering. * - Resize operation initiated by long chain detection is executed by a - * call_rcu thread, which keeps lock-freedom of add and remove. + * worker thread, which keeps lock-freedom of add and remove. * - Resize operations are protected by a mutex. * - The removal operation is split in two parts: first, a "removed" * flag is set in the next pointer within the node to remove. Then, @@ -258,7 +258,6 @@ #define _LGPL_SOURCE #include #include -#include #include #include #include @@ -266,16 +265,22 @@ #include #include "compat-getcpu.h" -#include -#include -#include +#include +#include +#include +#include #include #include #include #include +#include #include #include #include +#include +#include "workqueue.h" +#include "urcu-die.h" +#include "urcu-utils.h" /* * Split-counters lazily update the global counter each 1024 @@ -335,11 +340,11 @@ struct ht_items_count { } __attribute__((aligned(CAA_CACHE_LINE_SIZE))); /* - * rcu_resize_work: Contains arguments passed to RCU worker thread + * resize_work: Contains arguments passed to worker thread * responsible for performing lazy resize. */ -struct rcu_resize_work { - struct rcu_head head; +struct resize_work { + struct urcu_work work; struct cds_lfht *ht; }; @@ -356,6 +361,49 @@ struct partition_resize_work { unsigned long start, unsigned long len); }; +static struct urcu_workqueue *cds_lfht_workqueue; +static unsigned long cds_lfht_workqueue_user_count; + +/* + * Mutex ensuring mutual exclusion between workqueue initialization and + * fork handlers. cds_lfht_fork_mutex nests inside call_rcu_mutex. + */ +static pthread_mutex_t cds_lfht_fork_mutex = PTHREAD_MUTEX_INITIALIZER; + +static struct urcu_atfork cds_lfht_atfork; + +/* + * atfork handler nesting counters. Handle being registered to many urcu + * flavors, thus being possibly invoked more than once in the + * pthread_atfork list of callbacks. + */ +static int cds_lfht_workqueue_atfork_nesting; + +static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor); +static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor); + +#ifdef CONFIG_CDS_LFHT_ITER_DEBUG + +static +void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter) +{ + iter->lfht = ht; +} + +#define cds_lfht_iter_debug_assert(...) urcu_posix_assert(__VA_ARGS__) + +#else + +static +void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht __attribute__((unused)), + struct cds_lfht_iter *iter __attribute__((unused))) +{ +} + +#define cds_lfht_iter_debug_assert(...) + +#endif + /* * Algorithm to reverse bits in a word by lookup table, extended to * 64-bit words. @@ -420,7 +468,7 @@ unsigned long bit_reverse_ulong(unsigned long v) * Returns 0 if no bit is set, else returns the position of the most * significant bit (from 1 to 32 on 32-bit, from 1 to 64 on 64-bit). */ -#if defined(__i386) || defined(__x86_64) +#if defined(URCU_ARCH_X86) static inline unsigned int fls_u32(uint32_t x) { @@ -436,7 +484,7 @@ unsigned int fls_u32(uint32_t x) #define HAS_FLS_U32 #endif -#if defined(__x86_64) +#if defined(URCU_ARCH_AMD64) static inline unsigned int fls_u64(uint64_t x) { @@ -534,6 +582,7 @@ unsigned int cds_lfht_fls_ulong(unsigned long x) * Return the minimum order for which x <= (1UL << order). * Return -1 if x is 0. */ +static int cds_lfht_get_count_order_u32(uint32_t x) { if (!x) @@ -561,6 +610,37 @@ static void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size, unsigned long count); +static void mutex_lock(pthread_mutex_t *mutex) +{ + int ret; + +#ifndef DISTRUST_SIGNALS_EXTREME + ret = pthread_mutex_lock(mutex); + if (ret) + urcu_die(ret); +#else /* #ifndef DISTRUST_SIGNALS_EXTREME */ + while ((ret = pthread_mutex_trylock(mutex)) != 0) { + if (ret != EBUSY && ret != EINTR) + urcu_die(ret); + if (CMM_LOAD_SHARED(URCU_TLS(rcu_reader).need_mb)) { + cmm_smp_mb(); + _CMM_STORE_SHARED(URCU_TLS(rcu_reader).need_mb, 0); + cmm_smp_mb(); + } + (void) poll(NULL, 0, 10); + } +#endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */ +} + +static void mutex_unlock(pthread_mutex_t *mutex) +{ + int ret; + + ret = pthread_mutex_unlock(mutex); + if (ret) + urcu_die(ret); +} + static long nr_cpus_mask = -1; static long split_count_mask = -1; static int split_count_order = -1; @@ -602,12 +682,12 @@ void alloc_split_items_count(struct cds_lfht *ht) cds_lfht_get_count_order_ulong(split_count_mask + 1); } - assert(split_count_mask >= 0); + urcu_posix_assert(split_count_mask >= 0); if (ht->flags & CDS_LFHT_ACCOUNTING) { ht->split_count = calloc(split_count_mask + 1, sizeof(struct ht_items_count)); - assert(ht->split_count); + urcu_posix_assert(ht->split_count); } else { ht->split_count = NULL; } @@ -624,7 +704,7 @@ int ht_get_split_count_index(unsigned long hash) { int cpu; - assert(split_count_mask >= 0); + urcu_posix_assert(split_count_mask >= 0); cpu = urcu_sched_getcpu(); if (caa_unlikely(cpu < 0)) return hash & split_count_mask; @@ -635,9 +715,8 @@ int ht_get_split_count_index(unsigned long hash) static void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash) { - unsigned long split_count; + unsigned long split_count, count; int index; - long count; if (caa_unlikely(!ht->split_count)) return; @@ -656,7 +735,7 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash) if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size) return; - dbg_printf("add set global %ld\n", count); + dbg_printf("add set global %lu\n", count); cds_lfht_resize_lazy_count(ht, size, count >> (CHAIN_LEN_TARGET - 1)); } @@ -664,9 +743,8 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash) static void ht_count_del(struct cds_lfht *ht, unsigned long size, unsigned long hash) { - unsigned long split_count; + unsigned long split_count, count; int index; - long count; if (caa_unlikely(!ht->split_count)) return; @@ -750,7 +828,7 @@ struct cds_lfht_node *clear_flag(struct cds_lfht_node *node) } static -int is_removed(struct cds_lfht_node *node) +int is_removed(const struct cds_lfht_node *node) { return ((unsigned long) node) & REMOVED_FLAG; } @@ -839,7 +917,7 @@ static inline struct cds_lfht_node *lookup_bucket(struct cds_lfht *ht, unsigned long size, unsigned long hash) { - assert(size > 0); + urcu_posix_assert(size > 0); return bucket_at(ht, hash & (size - 1)); } @@ -851,26 +929,26 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod { struct cds_lfht_node *iter_prev, *iter, *next, *new_next; - assert(!is_bucket(bucket)); - assert(!is_removed(bucket)); - assert(!is_removal_owner(bucket)); - assert(!is_bucket(node)); - assert(!is_removed(node)); - assert(!is_removal_owner(node)); + urcu_posix_assert(!is_bucket(bucket)); + urcu_posix_assert(!is_removed(bucket)); + urcu_posix_assert(!is_removal_owner(bucket)); + urcu_posix_assert(!is_bucket(node)); + urcu_posix_assert(!is_removed(node)); + urcu_posix_assert(!is_removal_owner(node)); for (;;) { iter_prev = bucket; /* We can always skip the bucket node initially */ iter = rcu_dereference(iter_prev->next); - assert(!is_removed(iter)); - assert(!is_removal_owner(iter)); - assert(iter_prev->reverse_hash <= node->reverse_hash); + urcu_posix_assert(!is_removed(iter)); + urcu_posix_assert(!is_removal_owner(iter)); + urcu_posix_assert(iter_prev->reverse_hash <= node->reverse_hash); /* * We should never be called with bucket (start of chain) * and logically removed node (end of path compression * marker) being the actual same node. This would be a * bug in the algorithm implementation. */ - assert(bucket != node); + urcu_posix_assert(bucket != node); for (;;) { if (caa_unlikely(is_end(iter))) return; @@ -882,8 +960,8 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod iter_prev = clear_flag(iter); iter = next; } - assert(!is_removed(iter)); - assert(!is_removal_owner(iter)); + urcu_posix_assert(!is_removed(iter)); + urcu_posix_assert(!is_removal_owner(iter)); if (is_bucket(iter)) new_next = flag_bucket(clear_flag(next)); else @@ -903,13 +981,13 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, if (!old_node) /* Return -ENOENT if asked to replace NULL node */ return -ENOENT; - assert(!is_removed(old_node)); - assert(!is_removal_owner(old_node)); - assert(!is_bucket(old_node)); - assert(!is_removed(new_node)); - assert(!is_removal_owner(new_node)); - assert(!is_bucket(new_node)); - assert(new_node != old_node); + urcu_posix_assert(!is_removed(old_node)); + urcu_posix_assert(!is_removal_owner(old_node)); + urcu_posix_assert(!is_bucket(old_node)); + urcu_posix_assert(!is_removed(new_node)); + urcu_posix_assert(!is_removal_owner(new_node)); + urcu_posix_assert(!is_bucket(new_node)); + urcu_posix_assert(new_node != old_node); for (;;) { /* Insert after node to be replaced */ if (is_removed(old_next)) { @@ -919,14 +997,14 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, */ return -ENOENT; } - assert(old_next == clear_flag(old_next)); - assert(new_node != old_next); + urcu_posix_assert(old_next == clear_flag(old_next)); + urcu_posix_assert(new_node != old_next); /* * REMOVAL_OWNER flag is _NEVER_ set before the REMOVED * flag. It is either set atomically at the same time * (replace) or after (del). */ - assert(!is_removal_owner(old_next)); + urcu_posix_assert(!is_removal_owner(old_next)); new_node->next = old_next; /* * Here is the whole trick for lock-free replace: we add @@ -958,7 +1036,7 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, bucket = lookup_bucket(ht, size, bit_reverse_ulong(old_node->reverse_hash)); _cds_lfht_gc_bucket(bucket, new_node); - assert(is_removed(CMM_LOAD_SHARED(old_node->next))); + urcu_posix_assert(is_removed(CMM_LOAD_SHARED(old_node->next))); return 0; } @@ -980,9 +1058,9 @@ void _cds_lfht_add(struct cds_lfht *ht, *return_node; struct cds_lfht_node *bucket; - assert(!is_bucket(node)); - assert(!is_removed(node)); - assert(!is_removal_owner(node)); + urcu_posix_assert(!is_bucket(node)); + urcu_posix_assert(!is_removed(node)); + urcu_posix_assert(!is_removal_owner(node)); bucket = lookup_bucket(ht, size, hash); for (;;) { uint32_t chain_len = 0; @@ -994,7 +1072,7 @@ void _cds_lfht_add(struct cds_lfht *ht, iter_prev = bucket; /* We can always skip the bucket node initially */ iter = rcu_dereference(iter_prev->next); - assert(iter_prev->reverse_hash <= node->reverse_hash); + urcu_posix_assert(iter_prev->reverse_hash <= node->reverse_hash); for (;;) { if (caa_unlikely(is_end(iter))) goto insert; @@ -1013,7 +1091,13 @@ void _cds_lfht_add(struct cds_lfht *ht, if (unique_ret && !is_bucket(next) && clear_flag(iter)->reverse_hash == node->reverse_hash) { - struct cds_lfht_iter d_iter = { .node = node, .next = iter, }; + struct cds_lfht_iter d_iter = { + .node = node, + .next = iter, +#ifdef CONFIG_CDS_LFHT_ITER_DEBUG + .lfht = ht, +#endif + }; /* * uniquely adding inserts the node as the first @@ -1041,12 +1125,12 @@ void _cds_lfht_add(struct cds_lfht *ht, } insert: - assert(node != clear_flag(iter)); - assert(!is_removed(iter_prev)); - assert(!is_removal_owner(iter_prev)); - assert(!is_removed(iter)); - assert(!is_removal_owner(iter)); - assert(iter_prev != node); + urcu_posix_assert(node != clear_flag(iter)); + urcu_posix_assert(!is_removed(iter_prev)); + urcu_posix_assert(!is_removal_owner(iter_prev)); + urcu_posix_assert(!is_removed(iter)); + urcu_posix_assert(!is_removal_owner(iter)); + urcu_posix_assert(iter_prev != node); if (!bucket_flag) node->next = clear_flag(iter); else @@ -1064,8 +1148,8 @@ void _cds_lfht_add(struct cds_lfht *ht, } gc_node: - assert(!is_removed(iter)); - assert(!is_removal_owner(iter)); + urcu_posix_assert(!is_removed(iter)); + urcu_posix_assert(!is_removal_owner(iter)); if (is_bucket(iter)) new_next = flag_bucket(clear_flag(next)); else @@ -1090,9 +1174,9 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, return -ENOENT; /* logically delete the node */ - assert(!is_bucket(node)); - assert(!is_removed(node)); - assert(!is_removal_owner(node)); + urcu_posix_assert(!is_bucket(node)); + urcu_posix_assert(!is_removed(node)); + urcu_posix_assert(!is_removal_owner(node)); /* * We are first checking if the node had previously been @@ -1103,7 +1187,7 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, next = CMM_LOAD_SHARED(node->next); /* next is not dereferenced */ if (caa_unlikely(is_removed(next))) return -ENOENT; - assert(!is_bucket(next)); + urcu_posix_assert(!is_bucket(next)); /* * The del operation semantic guarantees a full memory barrier * before the uatomic_or atomic commit of the deletion flag. @@ -1126,7 +1210,7 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash)); _cds_lfht_gc_bucket(bucket, node); - assert(is_removed(CMM_LOAD_SHARED(node->next))); + urcu_posix_assert(is_removed(CMM_LOAD_SHARED(node->next))); /* * Last phase: atomically exchange node->next with a version * having "REMOVAL_OWNER_FLAG" set. If the returned node->next @@ -1165,10 +1249,10 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, { unsigned long partition_len, start = 0; struct partition_resize_work *work; - int thread, ret; - unsigned long nr_threads; + int ret; + unsigned long thread, nr_threads; - assert(nr_cpus_mask != -1); + urcu_posix_assert(nr_cpus_mask != -1); if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) goto fallback; @@ -1178,7 +1262,7 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, * partition size, up to the number of CPUs in the system. */ if (nr_cpus_mask > 0) { - nr_threads = min(nr_cpus_mask + 1, + nr_threads = min_t(unsigned long, nr_cpus_mask + 1, len >> MIN_PARTITION_PER_THREAD_ORDER); } else { nr_threads = 1; @@ -1208,11 +1292,11 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, nr_threads = thread; break; } - assert(!ret); + urcu_posix_assert(!ret); } for (thread = 0; thread < nr_threads; thread++) { ret = pthread_join(work[thread].thread_id, NULL); - assert(!ret); + urcu_posix_assert(!ret); } free(work); @@ -1224,14 +1308,12 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, if (start == 0 && nr_threads > 0) return; fallback: - ht->flavor->thread_online(); fct(ht, i, start, len); - ht->flavor->thread_offline(); } /* * Holding RCU read lock to protect _cds_lfht_add against memory - * reclaim that could be performed by other call_rcu worker threads (ABA + * reclaim that could be performed by other worker threads (ABA * problem). * * When we reach a certain length, we can split this population phase over @@ -1246,12 +1328,12 @@ void init_table_populate_partition(struct cds_lfht *ht, unsigned long i, { unsigned long j, size = 1UL << (i - 1); - assert(i > MIN_TABLE_ORDER); + urcu_posix_assert(i > MIN_TABLE_ORDER); ht->flavor->read_lock(); for (j = size + start; j < size + start + len; j++) { struct cds_lfht_node *new_node = bucket_at(ht, j); - assert(j >= size && j < (size << 1)); + urcu_posix_assert(j >= size && j < (size << 1)); dbg_printf("init populate: order %lu index %lu hash %lu\n", i, j, j); new_node->reverse_hash = bit_reverse_ulong(j); @@ -1275,7 +1357,7 @@ void init_table(struct cds_lfht *ht, dbg_printf("init table: first_order %lu last_order %lu\n", first_order, last_order); - assert(first_order > MIN_TABLE_ORDER); + urcu_posix_assert(first_order > MIN_TABLE_ORDER); for (i = first_order; i <= last_order; i++) { unsigned long len; @@ -1308,7 +1390,7 @@ void init_table(struct cds_lfht *ht, /* * Holding RCU read lock to protect _cds_lfht_remove against memory - * reclaim that could be performed by other call_rcu worker threads (ABA + * reclaim that could be performed by other worker threads (ABA * problem). * For a single level, we logically remove and garbage collect each node. * @@ -1320,8 +1402,9 @@ void init_table(struct cds_lfht *ht, * * Concurrent removal and add operations are helping us perform garbage * collection of logically removed nodes. We guarantee that all logically - * removed nodes have been garbage-collected (unlinked) before call_rcu is - * invoked to free a hole level of bucket nodes (after a grace period). + * removed nodes have been garbage-collected (unlinked) before work + * enqueue is invoked to free a hole level of bucket nodes (after a + * grace period). * * Logical removal and garbage collection can therefore be done in batch * or on a node-per-node basis, as long as the guarantee above holds. @@ -1337,13 +1420,13 @@ void remove_table_partition(struct cds_lfht *ht, unsigned long i, { unsigned long j, size = 1UL << (i - 1); - assert(i > MIN_TABLE_ORDER); + urcu_posix_assert(i > MIN_TABLE_ORDER); ht->flavor->read_lock(); for (j = size + start; j < size + start + len; j++) { struct cds_lfht_node *fini_bucket = bucket_at(ht, j); struct cds_lfht_node *parent_bucket = bucket_at(ht, j - size); - assert(j >= size && j < (size << 1)); + urcu_posix_assert(j >= size && j < (size << 1)); dbg_printf("remove entry: order %lu index %lu hash %lu\n", i, j, j); /* Set the REMOVED_FLAG to freeze the ->next for gc */ @@ -1368,12 +1451,11 @@ static void fini_table(struct cds_lfht *ht, unsigned long first_order, unsigned long last_order) { - long i; - unsigned long free_by_rcu_order = 0; + unsigned long free_by_rcu_order = 0, i; dbg_printf("fini table: first_order %lu last_order %lu\n", first_order, last_order); - assert(first_order > MIN_TABLE_ORDER); + urcu_posix_assert(first_order > MIN_TABLE_ORDER); for (i = last_order; i >= first_order; i--) { unsigned long len; @@ -1418,11 +1500,15 @@ void fini_table(struct cds_lfht *ht, } } +/* + * Never called with size < 1. + */ static void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) { struct cds_lfht_node *prev, *node; unsigned long order, len, i; + int bucket_order; cds_lfht_alloc_bucket_table(ht, 0); @@ -1431,7 +1517,10 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) node->next = flag_bucket(get_end()); node->reverse_hash = 0; - for (order = 1; order < cds_lfht_get_count_order_ulong(size) + 1; order++) { + bucket_order = cds_lfht_get_count_order_ulong(size); + urcu_posix_assert(bucket_order >= 0); + + for (order = 1; order < (unsigned long) bucket_order + 1; order++) { len = 1UL << (order - 1); cds_lfht_alloc_bucket_table(ht, order); @@ -1455,13 +1544,40 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) node->reverse_hash = bit_reverse_ulong(len + i); /* insert after prev */ - assert(is_bucket(prev->next)); + urcu_posix_assert(is_bucket(prev->next)); node->next = prev->next; prev->next = flag_bucket(node); } } } +#if (CAA_BITS_PER_LONG > 32) +/* + * For 64-bit architectures, with max number of buckets small enough not to + * use the entire 64-bit memory mapping space (and allowing a fair number of + * hash table instances), use the mmap allocator, which is faster. Otherwise, + * fallback to the order allocator. + */ +static +const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets) +{ + if (max_nr_buckets && max_nr_buckets <= (1ULL << 32)) + return &cds_lfht_mm_mmap; + else + return &cds_lfht_mm_order; +} +#else +/* + * For 32-bit architectures, use the order allocator. + */ +static +const struct cds_lfht_mm_type *get_mm_type( + unsigned long max_nr_buckets __attribute__((unused))) +{ + return &cds_lfht_mm_order; +} +#endif + struct cds_lfht *_cds_lfht_new(unsigned long init_size, unsigned long min_nr_alloc_buckets, unsigned long max_nr_buckets, @@ -1484,26 +1600,8 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size, /* * Memory management plugin default. */ - if (!mm) { - if (CAA_BITS_PER_LONG > 32 - && max_nr_buckets - && max_nr_buckets <= (1ULL << 32)) { - /* - * For 64-bit architectures, with max number of - * buckets small enough not to use the entire - * 64-bit memory mapping space (and allowing a - * fair number of hash table instances), use the - * mmap allocator, which is faster than the - * order allocator. - */ - mm = &cds_lfht_mm_mmap; - } else { - /* - * The fallback is to use the order allocator. - */ - mm = &cds_lfht_mm_order; - } - } + if (!mm) + mm = get_mm_type(max_nr_buckets); /* max_nr_buckets == 0 for order based mm means infinite */ if (mm == &cds_lfht_mm_order && !max_nr_buckets) @@ -1513,15 +1611,18 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size, if (!max_nr_buckets || (max_nr_buckets & (max_nr_buckets - 1))) return NULL; + if (flags & CDS_LFHT_AUTO_RESIZE) + cds_lfht_init_worker(flavor); + min_nr_alloc_buckets = max(min_nr_alloc_buckets, MIN_TABLE_SIZE); init_size = max(init_size, MIN_TABLE_SIZE); max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets); init_size = min(init_size, max_nr_buckets); ht = mm->alloc_cds_lfht(min_nr_alloc_buckets, max_nr_buckets); - assert(ht); - assert(ht->mm == mm); - assert(ht->bucket_at == mm->bucket_at); + urcu_posix_assert(ht); + urcu_posix_assert(ht->mm == mm); + urcu_posix_assert(ht->bucket_at == mm->bucket_at); ht->flags = flags; ht->flavor = flavor; @@ -1543,6 +1644,8 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, struct cds_lfht_node *node, *next, *bucket; unsigned long reverse_hash, size; + cds_lfht_iter_debug_set_ht(ht, iter); + reverse_hash = bit_reverse_ulong(hash); size = rcu_dereference(ht->size); @@ -1560,7 +1663,7 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, break; } next = rcu_dereference(node->next); - assert(node == clear_flag(node)); + urcu_posix_assert(node == clear_flag(node)); if (caa_likely(!is_removed(next)) && !is_bucket(next) && node->reverse_hash == reverse_hash @@ -1569,17 +1672,19 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, } node = clear_flag(next); } - assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); + urcu_posix_assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); iter->node = node; iter->next = next; } -void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match, +void cds_lfht_next_duplicate(struct cds_lfht *ht __attribute__((unused)), + cds_lfht_match_fct match, const void *key, struct cds_lfht_iter *iter) { struct cds_lfht_node *node, *next; unsigned long reverse_hash; + cds_lfht_iter_debug_assert(ht == iter->lfht); node = iter->node; reverse_hash = node->reverse_hash; next = iter->next; @@ -1602,15 +1707,17 @@ void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match, } node = clear_flag(next); } - assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); + urcu_posix_assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); iter->node = node; iter->next = next; } -void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) +void cds_lfht_next(struct cds_lfht *ht __attribute__((unused)), + struct cds_lfht_iter *iter) { struct cds_lfht_node *node, *next; + cds_lfht_iter_debug_assert(ht == iter->lfht); node = clear_flag(iter->next); for (;;) { if (caa_unlikely(is_end(node))) { @@ -1624,13 +1731,14 @@ void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) } node = clear_flag(next); } - assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); + urcu_posix_assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); iter->node = node; iter->next = next; } void cds_lfht_first(struct cds_lfht *ht, struct cds_lfht_iter *iter) { + cds_lfht_iter_debug_set_ht(ht, iter); /* * Get next after first bucket node. The first bucket node is the * first node of the linked list. @@ -1727,7 +1835,7 @@ int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node) return ret; } -int cds_lfht_is_node_deleted(struct cds_lfht_node *node) +int cds_lfht_is_node_deleted(const struct cds_lfht_node *node) { return is_removed(CMM_LOAD_SHARED(node->next)); } @@ -1744,8 +1852,8 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) node = clear_flag(node)->next; if (!is_bucket(node)) return -EPERM; - assert(!is_removed(node)); - assert(!is_removal_owner(node)); + urcu_posix_assert(!is_removed(node)); + urcu_posix_assert(!is_removal_owner(node)); } while (!is_end(node)); /* * size accessed without rcu_dereference because hash table is @@ -1757,7 +1865,7 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) node = bucket_at(ht, i); dbg_printf("delete bucket: index %lu expected hash %lu hash %lu\n", i, i, bit_reverse_ulong(node->reverse_hash)); - assert(is_bucket(node->next)); + urcu_posix_assert(is_bucket(node->next)); } for (order = cds_lfht_get_count_order_ulong(size); (long)order >= 0; order--) @@ -1772,25 +1880,14 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) */ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) { - int ret, was_online; - - /* Wait for in-flight resize operations to complete */ - _CMM_STORE_SHARED(ht->in_progress_destroy, 1); - cmm_smp_mb(); /* Store destroy before load resize */ - was_online = ht->flavor->read_ongoing(); - if (was_online) - ht->flavor->thread_offline(); - /* Calling with RCU read-side held is an error. */ - if (ht->flavor->read_ongoing()) { - ret = -EINVAL; - if (was_online) - ht->flavor->thread_online(); - goto end; + int ret; + + if (ht->flags & CDS_LFHT_AUTO_RESIZE) { + /* Cancel ongoing resize operations. */ + _CMM_STORE_SHARED(ht->in_progress_destroy, 1); + /* Wait for in-flight resize operations to complete */ + urcu_workqueue_flush_queued_work(cds_lfht_workqueue); } - while (uatomic_read(&ht->in_progress_resize)) - poll(NULL, 0, 100); /* wait for 100ms */ - if (was_online) - ht->flavor->thread_online(); ret = cds_lfht_delete_bucket(ht); if (ret) return ret; @@ -1800,8 +1897,9 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) ret = pthread_mutex_destroy(&ht->resize_mutex); if (ret) ret = -EBUSY; + if (ht->flags & CDS_LFHT_AUTO_RESIZE) + cds_lfht_fini_worker(ht->flavor); poison_free(ht); -end: return ret; } @@ -1864,7 +1962,7 @@ void _do_cds_lfht_grow(struct cds_lfht *ht, new_order = cds_lfht_get_count_order_ulong(new_size); dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n", old_size, old_order, new_size, new_order); - assert(new_size > old_size); + urcu_posix_assert(new_size > old_size); init_table(ht, old_order + 1, new_order); } @@ -1880,7 +1978,7 @@ void _do_cds_lfht_shrink(struct cds_lfht *ht, new_order = cds_lfht_get_count_order_ulong(new_size); dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n", old_size, old_order, new_size, new_order); - assert(new_size < old_size); + urcu_posix_assert(new_size < old_size); /* Remove and unlink all bucket nodes to remove. */ fini_table(ht, new_order + 1, old_order); @@ -1897,7 +1995,6 @@ void _do_cds_lfht_resize(struct cds_lfht *ht) * Resize table, re-do if the target size has changed under us. */ do { - assert(uatomic_read(&ht->in_progress_resize)); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) break; ht->resize_initiated = 1; @@ -1930,71 +2027,47 @@ void resize_target_update_count(struct cds_lfht *ht, void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size) { - int was_online; - - was_online = ht->flavor->read_ongoing(); - if (was_online) - ht->flavor->thread_offline(); - /* Calling with RCU read-side held is an error. */ - if (ht->flavor->read_ongoing()) { - static int print_once; - - if (!CMM_LOAD_SHARED(print_once)) - fprintf(stderr, "[error] rculfhash: cds_lfht_resize " - "called with RCU read-side lock held.\n"); - CMM_STORE_SHARED(print_once, 1); - assert(0); - goto end; - } resize_target_update_count(ht, new_size); CMM_STORE_SHARED(ht->resize_initiated, 1); - pthread_mutex_lock(&ht->resize_mutex); + mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); - pthread_mutex_unlock(&ht->resize_mutex); -end: - if (was_online) - ht->flavor->thread_online(); + mutex_unlock(&ht->resize_mutex); } static -void do_resize_cb(struct rcu_head *head) +void do_resize_cb(struct urcu_work *work) { - struct rcu_resize_work *work = - caa_container_of(head, struct rcu_resize_work, head); - struct cds_lfht *ht = work->ht; + struct resize_work *resize_work = + caa_container_of(work, struct resize_work, work); + struct cds_lfht *ht = resize_work->ht; - ht->flavor->thread_offline(); - pthread_mutex_lock(&ht->resize_mutex); + ht->flavor->register_thread(); + mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); - pthread_mutex_unlock(&ht->resize_mutex); - ht->flavor->thread_online(); + mutex_unlock(&ht->resize_mutex); + ht->flavor->unregister_thread(); poison_free(work); - cmm_smp_mb(); /* finish resize before decrement */ - uatomic_dec(&ht->in_progress_resize); } static void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht) { - struct rcu_resize_work *work; + struct resize_work *work; /* Store resize_target before read resize_initiated */ cmm_smp_mb(); if (!CMM_LOAD_SHARED(ht->resize_initiated)) { - uatomic_inc(&ht->in_progress_resize); - cmm_smp_mb(); /* increment resize count before load destroy */ if (CMM_LOAD_SHARED(ht->in_progress_destroy)) { - uatomic_dec(&ht->in_progress_resize); return; } work = malloc(sizeof(*work)); if (work == NULL) { dbg_printf("error allocating resize work, bailing out\n"); - uatomic_dec(&ht->in_progress_resize); return; } work->ht = ht; - ht->flavor->update_call_rcu(&work->head, do_resize_cb); + urcu_workqueue_queue_work(cds_lfht_workqueue, + &work->work, do_resize_cb); CMM_STORE_SHARED(ht->resize_initiated, 1); } } @@ -2045,3 +2118,90 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size, } __cds_lfht_resize_lazy_launch(ht); } + +static void cds_lfht_before_fork(void *priv __attribute__((unused))) +{ + if (cds_lfht_workqueue_atfork_nesting++) + return; + mutex_lock(&cds_lfht_fork_mutex); + if (!cds_lfht_workqueue) + return; + urcu_workqueue_pause_worker(cds_lfht_workqueue); +} + +static void cds_lfht_after_fork_parent(void *priv __attribute__((unused))) +{ + if (--cds_lfht_workqueue_atfork_nesting) + return; + if (!cds_lfht_workqueue) + goto end; + urcu_workqueue_resume_worker(cds_lfht_workqueue); +end: + mutex_unlock(&cds_lfht_fork_mutex); +} + +static void cds_lfht_after_fork_child(void *priv __attribute__((unused))) +{ + if (--cds_lfht_workqueue_atfork_nesting) + return; + if (!cds_lfht_workqueue) + goto end; + urcu_workqueue_create_worker(cds_lfht_workqueue); +end: + mutex_unlock(&cds_lfht_fork_mutex); +} + +static struct urcu_atfork cds_lfht_atfork = { + .before_fork = cds_lfht_before_fork, + .after_fork_parent = cds_lfht_after_fork_parent, + .after_fork_child = cds_lfht_after_fork_child, +}; + +/* + * Block all signals for the workqueue worker thread to ensure we don't + * disturb the application. The SIGRCU signal needs to be unblocked for + * the urcu-signal flavor. + */ +static void cds_lfht_worker_init( + struct urcu_workqueue *workqueue __attribute__((unused)), + void *priv __attribute__((unused))) +{ + int ret; + sigset_t mask; + + ret = sigfillset(&mask); + if (ret) + urcu_die(errno); + ret = sigdelset(&mask, SIGRCU); + if (ret) + urcu_die(errno); + ret = pthread_sigmask(SIG_SETMASK, &mask, NULL); + if (ret) + urcu_die(ret); +} + +static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor) +{ + flavor->register_rculfhash_atfork(&cds_lfht_atfork); + + mutex_lock(&cds_lfht_fork_mutex); + if (cds_lfht_workqueue_user_count++) + goto end; + cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL, + NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL); +end: + mutex_unlock(&cds_lfht_fork_mutex); +} + +static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor) +{ + mutex_lock(&cds_lfht_fork_mutex); + if (--cds_lfht_workqueue_user_count) + goto end; + urcu_workqueue_destroy(cds_lfht_workqueue); + cds_lfht_workqueue = NULL; +end: + mutex_unlock(&cds_lfht_fork_mutex); + + flavor->unregister_rculfhash_atfork(&cds_lfht_atfork); +}