X-Git-Url: https://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=src%2Frculfhash.c;h=811d5156ccdebc06679758ea282c9df22060fd16;hp=d7a1f23bf8f766040341acbd18584246b5c327e4;hb=9fd30396a597942084b007f33cc7f2c279f746e9;hpb=6893800a4d1cc14dff0395ddcd660a5138db183d diff --git a/src/rculfhash.c b/src/rculfhash.c index d7a1f23..811d515 100644 --- a/src/rculfhash.c +++ b/src/rculfhash.c @@ -64,7 +64,7 @@ * - Split-counters are used to keep track of the number of * nodes within the hash table for automatic resize triggering. * - Resize operation initiated by long chain detection is executed by a - * call_rcu thread, which keeps lock-freedom of add and remove. + * worker thread, which keeps lock-freedom of add and remove. * - Resize operations are protected by a mutex. * - The removal operation is split in two parts: first, a "removed" * flag is set in the next pointer within the node to remove. Then, @@ -266,16 +266,21 @@ #include #include "compat-getcpu.h" -#include -#include -#include +#include +#include +#include #include #include #include #include +#include #include #include #include +#include +#include "workqueue.h" +#include "urcu-die.h" +#include "urcu-utils.h" /* * Split-counters lazily update the global counter each 1024 @@ -335,11 +340,11 @@ struct ht_items_count { } __attribute__((aligned(CAA_CACHE_LINE_SIZE))); /* - * rcu_resize_work: Contains arguments passed to RCU worker thread + * resize_work: Contains arguments passed to worker thread * responsible for performing lazy resize. */ -struct rcu_resize_work { - struct rcu_head head; +struct resize_work { + struct urcu_work work; struct cds_lfht *ht; }; @@ -356,6 +361,48 @@ struct partition_resize_work { unsigned long start, unsigned long len); }; +static struct urcu_workqueue *cds_lfht_workqueue; +static unsigned long cds_lfht_workqueue_user_count; + +/* + * Mutex ensuring mutual exclusion between workqueue initialization and + * fork handlers. cds_lfht_fork_mutex nests inside call_rcu_mutex. + */ +static pthread_mutex_t cds_lfht_fork_mutex = PTHREAD_MUTEX_INITIALIZER; + +static struct urcu_atfork cds_lfht_atfork; + +/* + * atfork handler nesting counters. Handle being registered to many urcu + * flavors, thus being possibly invoked more than once in the + * pthread_atfork list of callbacks. + */ +static int cds_lfht_workqueue_atfork_nesting; + +static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor); +static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor); + +#ifdef CONFIG_CDS_LFHT_ITER_DEBUG + +static +void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter) +{ + iter->lfht = ht; +} + +#define cds_lfht_iter_debug_assert(...) assert(__VA_ARGS__) + +#else + +static +void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter) +{ +} + +#define cds_lfht_iter_debug_assert(...) + +#endif + /* * Algorithm to reverse bits in a word by lookup table, extended to * 64-bit words. @@ -561,6 +608,37 @@ static void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size, unsigned long count); +static void mutex_lock(pthread_mutex_t *mutex) +{ + int ret; + +#ifndef DISTRUST_SIGNALS_EXTREME + ret = pthread_mutex_lock(mutex); + if (ret) + urcu_die(ret); +#else /* #ifndef DISTRUST_SIGNALS_EXTREME */ + while ((ret = pthread_mutex_trylock(mutex)) != 0) { + if (ret != EBUSY && ret != EINTR) + urcu_die(ret); + if (CMM_LOAD_SHARED(URCU_TLS(rcu_reader).need_mb)) { + cmm_smp_mb(); + _CMM_STORE_SHARED(URCU_TLS(rcu_reader).need_mb, 0); + cmm_smp_mb(); + } + (void) poll(NULL, 0, 10); + } +#endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */ +} + +static void mutex_unlock(pthread_mutex_t *mutex) +{ + int ret; + + ret = pthread_mutex_unlock(mutex); + if (ret) + urcu_die(ret); +} + static long nr_cpus_mask = -1; static long split_count_mask = -1; static int split_count_order = -1; @@ -635,9 +713,8 @@ int ht_get_split_count_index(unsigned long hash) static void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash) { - unsigned long split_count; + unsigned long split_count, count; int index; - long count; if (caa_unlikely(!ht->split_count)) return; @@ -656,7 +733,7 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash) if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size) return; - dbg_printf("add set global %ld\n", count); + dbg_printf("add set global %lu\n", count); cds_lfht_resize_lazy_count(ht, size, count >> (CHAIN_LEN_TARGET - 1)); } @@ -664,9 +741,8 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash) static void ht_count_del(struct cds_lfht *ht, unsigned long size, unsigned long hash) { - unsigned long split_count; + unsigned long split_count, count; int index; - long count; if (caa_unlikely(!ht->split_count)) return; @@ -1013,7 +1089,13 @@ void _cds_lfht_add(struct cds_lfht *ht, if (unique_ret && !is_bucket(next) && clear_flag(iter)->reverse_hash == node->reverse_hash) { - struct cds_lfht_iter d_iter = { .node = node, .next = iter, }; + struct cds_lfht_iter d_iter = { + .node = node, + .next = iter, +#ifdef CONFIG_CDS_LFHT_ITER_DEBUG + .lfht = ht, +#endif + }; /* * uniquely adding inserts the node as the first @@ -1165,8 +1247,8 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, { unsigned long partition_len, start = 0; struct partition_resize_work *work; - int thread, ret; - unsigned long nr_threads; + int ret; + unsigned long thread, nr_threads; assert(nr_cpus_mask != -1); if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) @@ -1178,7 +1260,7 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, * partition size, up to the number of CPUs in the system. */ if (nr_cpus_mask > 0) { - nr_threads = min(nr_cpus_mask + 1, + nr_threads = min_t(unsigned long, nr_cpus_mask + 1, len >> MIN_PARTITION_PER_THREAD_ORDER); } else { nr_threads = 1; @@ -1224,14 +1306,12 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, if (start == 0 && nr_threads > 0) return; fallback: - ht->flavor->thread_online(); fct(ht, i, start, len); - ht->flavor->thread_offline(); } /* * Holding RCU read lock to protect _cds_lfht_add against memory - * reclaim that could be performed by other call_rcu worker threads (ABA + * reclaim that could be performed by other worker threads (ABA * problem). * * When we reach a certain length, we can split this population phase over @@ -1308,7 +1388,7 @@ void init_table(struct cds_lfht *ht, /* * Holding RCU read lock to protect _cds_lfht_remove against memory - * reclaim that could be performed by other call_rcu worker threads (ABA + * reclaim that could be performed by other worker threads (ABA * problem). * For a single level, we logically remove and garbage collect each node. * @@ -1320,8 +1400,9 @@ void init_table(struct cds_lfht *ht, * * Concurrent removal and add operations are helping us perform garbage * collection of logically removed nodes. We guarantee that all logically - * removed nodes have been garbage-collected (unlinked) before call_rcu is - * invoked to free a hole level of bucket nodes (after a grace period). + * removed nodes have been garbage-collected (unlinked) before work + * enqueue is invoked to free a hole level of bucket nodes (after a + * grace period). * * Logical removal and garbage collection can therefore be done in batch * or on a node-per-node basis, as long as the guarantee above holds. @@ -1368,8 +1449,7 @@ static void fini_table(struct cds_lfht *ht, unsigned long first_order, unsigned long last_order) { - long i; - unsigned long free_by_rcu_order = 0; + unsigned long free_by_rcu_order = 0, i; dbg_printf("fini table: first_order %lu last_order %lu\n", first_order, last_order); @@ -1418,11 +1498,15 @@ void fini_table(struct cds_lfht *ht, } } +/* + * Never called with size < 1. + */ static void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) { struct cds_lfht_node *prev, *node; unsigned long order, len, i; + int bucket_order; cds_lfht_alloc_bucket_table(ht, 0); @@ -1431,7 +1515,10 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) node->next = flag_bucket(get_end()); node->reverse_hash = 0; - for (order = 1; order < cds_lfht_get_count_order_ulong(size) + 1; order++) { + bucket_order = cds_lfht_get_count_order_ulong(size); + assert(bucket_order >= 0); + + for (order = 1; order < (unsigned long) bucket_order + 1; order++) { len = 1UL << (order - 1); cds_lfht_alloc_bucket_table(ht, order); @@ -1462,6 +1549,32 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) } } +#if (CAA_BITS_PER_LONG > 32) +/* + * For 64-bit architectures, with max number of buckets small enough not to + * use the entire 64-bit memory mapping space (and allowing a fair number of + * hash table instances), use the mmap allocator, which is faster. Otherwise, + * fallback to the order allocator. + */ +static +const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets) +{ + if (max_nr_buckets && max_nr_buckets <= (1ULL << 32)) + return &cds_lfht_mm_mmap; + else + return &cds_lfht_mm_order; +} +#else +/* + * For 32-bit architectures, use the order allocator. + */ +static +const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets) +{ + return &cds_lfht_mm_order; +} +#endif + struct cds_lfht *_cds_lfht_new(unsigned long init_size, unsigned long min_nr_alloc_buckets, unsigned long max_nr_buckets, @@ -1484,26 +1597,8 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size, /* * Memory management plugin default. */ - if (!mm) { - if (CAA_BITS_PER_LONG > 32 - && max_nr_buckets - && max_nr_buckets <= (1ULL << 32)) { - /* - * For 64-bit architectures, with max number of - * buckets small enough not to use the entire - * 64-bit memory mapping space (and allowing a - * fair number of hash table instances), use the - * mmap allocator, which is faster than the - * order allocator. - */ - mm = &cds_lfht_mm_mmap; - } else { - /* - * The fallback is to use the order allocator. - */ - mm = &cds_lfht_mm_order; - } - } + if (!mm) + mm = get_mm_type(max_nr_buckets); /* max_nr_buckets == 0 for order based mm means infinite */ if (mm == &cds_lfht_mm_order && !max_nr_buckets) @@ -1513,6 +1608,9 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size, if (!max_nr_buckets || (max_nr_buckets & (max_nr_buckets - 1))) return NULL; + if (flags & CDS_LFHT_AUTO_RESIZE) + cds_lfht_init_worker(flavor); + min_nr_alloc_buckets = max(min_nr_alloc_buckets, MIN_TABLE_SIZE); init_size = max(init_size, MIN_TABLE_SIZE); max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets); @@ -1543,6 +1641,8 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, struct cds_lfht_node *node, *next, *bucket; unsigned long reverse_hash, size; + cds_lfht_iter_debug_set_ht(ht, iter); + reverse_hash = bit_reverse_ulong(hash); size = rcu_dereference(ht->size); @@ -1580,6 +1680,7 @@ void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match, struct cds_lfht_node *node, *next; unsigned long reverse_hash; + cds_lfht_iter_debug_assert(ht == iter->lfht); node = iter->node; reverse_hash = node->reverse_hash; next = iter->next; @@ -1611,6 +1712,7 @@ void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) { struct cds_lfht_node *node, *next; + cds_lfht_iter_debug_assert(ht == iter->lfht); node = clear_flag(iter->next); for (;;) { if (caa_unlikely(is_end(node))) { @@ -1631,6 +1733,7 @@ void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) void cds_lfht_first(struct cds_lfht *ht, struct cds_lfht_iter *iter) { + cds_lfht_iter_debug_set_ht(ht, iter); /* * Get next after first bucket node. The first bucket node is the * first node of the linked list. @@ -1772,25 +1875,14 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) */ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) { - int ret, was_online; - - /* Wait for in-flight resize operations to complete */ - _CMM_STORE_SHARED(ht->in_progress_destroy, 1); - cmm_smp_mb(); /* Store destroy before load resize */ - was_online = ht->flavor->read_ongoing(); - if (was_online) - ht->flavor->thread_offline(); - /* Calling with RCU read-side held is an error. */ - if (ht->flavor->read_ongoing()) { - ret = -EINVAL; - if (was_online) - ht->flavor->thread_online(); - goto end; + int ret; + + if (ht->flags & CDS_LFHT_AUTO_RESIZE) { + /* Cancel ongoing resize operations. */ + _CMM_STORE_SHARED(ht->in_progress_destroy, 1); + /* Wait for in-flight resize operations to complete */ + urcu_workqueue_flush_queued_work(cds_lfht_workqueue); } - while (uatomic_read(&ht->in_progress_resize)) - poll(NULL, 0, 100); /* wait for 100ms */ - if (was_online) - ht->flavor->thread_online(); ret = cds_lfht_delete_bucket(ht); if (ret) return ret; @@ -1800,8 +1892,9 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) ret = pthread_mutex_destroy(&ht->resize_mutex); if (ret) ret = -EBUSY; + if (ht->flags & CDS_LFHT_AUTO_RESIZE) + cds_lfht_fini_worker(ht->flavor); poison_free(ht); -end: return ret; } @@ -1897,7 +1990,6 @@ void _do_cds_lfht_resize(struct cds_lfht *ht) * Resize table, re-do if the target size has changed under us. */ do { - assert(uatomic_read(&ht->in_progress_resize)); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) break; ht->resize_initiated = 1; @@ -1930,71 +2022,47 @@ void resize_target_update_count(struct cds_lfht *ht, void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size) { - int was_online; - - was_online = ht->flavor->read_ongoing(); - if (was_online) - ht->flavor->thread_offline(); - /* Calling with RCU read-side held is an error. */ - if (ht->flavor->read_ongoing()) { - static int print_once; - - if (!CMM_LOAD_SHARED(print_once)) - fprintf(stderr, "[error] rculfhash: cds_lfht_resize " - "called with RCU read-side lock held.\n"); - CMM_STORE_SHARED(print_once, 1); - assert(0); - goto end; - } resize_target_update_count(ht, new_size); CMM_STORE_SHARED(ht->resize_initiated, 1); - pthread_mutex_lock(&ht->resize_mutex); + mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); - pthread_mutex_unlock(&ht->resize_mutex); -end: - if (was_online) - ht->flavor->thread_online(); + mutex_unlock(&ht->resize_mutex); } static -void do_resize_cb(struct rcu_head *head) +void do_resize_cb(struct urcu_work *work) { - struct rcu_resize_work *work = - caa_container_of(head, struct rcu_resize_work, head); - struct cds_lfht *ht = work->ht; + struct resize_work *resize_work = + caa_container_of(work, struct resize_work, work); + struct cds_lfht *ht = resize_work->ht; - ht->flavor->thread_offline(); - pthread_mutex_lock(&ht->resize_mutex); + ht->flavor->register_thread(); + mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); - pthread_mutex_unlock(&ht->resize_mutex); - ht->flavor->thread_online(); + mutex_unlock(&ht->resize_mutex); + ht->flavor->unregister_thread(); poison_free(work); - cmm_smp_mb(); /* finish resize before decrement */ - uatomic_dec(&ht->in_progress_resize); } static void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht) { - struct rcu_resize_work *work; + struct resize_work *work; /* Store resize_target before read resize_initiated */ cmm_smp_mb(); if (!CMM_LOAD_SHARED(ht->resize_initiated)) { - uatomic_inc(&ht->in_progress_resize); - cmm_smp_mb(); /* increment resize count before load destroy */ if (CMM_LOAD_SHARED(ht->in_progress_destroy)) { - uatomic_dec(&ht->in_progress_resize); return; } work = malloc(sizeof(*work)); if (work == NULL) { dbg_printf("error allocating resize work, bailing out\n"); - uatomic_dec(&ht->in_progress_resize); return; } work->ht = ht; - ht->flavor->update_call_rcu(&work->head, do_resize_cb); + urcu_workqueue_queue_work(cds_lfht_workqueue, + &work->work, do_resize_cb); CMM_STORE_SHARED(ht->resize_initiated, 1); } } @@ -2045,3 +2113,89 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size, } __cds_lfht_resize_lazy_launch(ht); } + +static void cds_lfht_before_fork(void *priv) +{ + if (cds_lfht_workqueue_atfork_nesting++) + return; + mutex_lock(&cds_lfht_fork_mutex); + if (!cds_lfht_workqueue) + return; + urcu_workqueue_pause_worker(cds_lfht_workqueue); +} + +static void cds_lfht_after_fork_parent(void *priv) +{ + if (--cds_lfht_workqueue_atfork_nesting) + return; + if (!cds_lfht_workqueue) + goto end; + urcu_workqueue_resume_worker(cds_lfht_workqueue); +end: + mutex_unlock(&cds_lfht_fork_mutex); +} + +static void cds_lfht_after_fork_child(void *priv) +{ + if (--cds_lfht_workqueue_atfork_nesting) + return; + if (!cds_lfht_workqueue) + goto end; + urcu_workqueue_create_worker(cds_lfht_workqueue); +end: + mutex_unlock(&cds_lfht_fork_mutex); +} + +static struct urcu_atfork cds_lfht_atfork = { + .before_fork = cds_lfht_before_fork, + .after_fork_parent = cds_lfht_after_fork_parent, + .after_fork_child = cds_lfht_after_fork_child, +}; + +/* + * Block all signals for the workqueue worker thread to ensure we don't + * disturb the application. The SIGRCU signal needs to be unblocked for + * the urcu-signal flavor. + */ +static void cds_lfht_worker_init(struct urcu_workqueue *workqueue, + void *priv) +{ + int ret; + sigset_t mask; + + ret = sigfillset(&mask); + if (ret) + urcu_die(errno); + ret = sigdelset(&mask, SIGRCU); + if (ret) + urcu_die(errno); + ret = pthread_sigmask(SIG_SETMASK, &mask, NULL); + if (ret) + urcu_die(ret); +} + +static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor) +{ + flavor->register_rculfhash_atfork(&cds_lfht_atfork); + + mutex_lock(&cds_lfht_fork_mutex); + if (cds_lfht_workqueue_user_count++) + goto end; + cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL, + NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL); +end: + mutex_unlock(&cds_lfht_fork_mutex); +} + +static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor) +{ + mutex_lock(&cds_lfht_fork_mutex); + if (--cds_lfht_workqueue_user_count) + goto end; + urcu_workqueue_destroy(cds_lfht_workqueue); + cds_lfht_workqueue = NULL; +end: + mutex_unlock(&cds_lfht_fork_mutex); + + flavor->unregister_rculfhash_atfork(&cds_lfht_atfork); +}