X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=rculfhash.c;h=d7d107f3788f3a70cec57be5380ae1ff1bbff173;hp=b9f795fd1349e6b28e2ac4c1e1f0de57154adad7;hb=a59f39055b5ecb77b68cf78b9839aa9e8e4ec332;hpb=5f177b1c042fd3f1b746dee55a90cc503c5276ee diff --git a/rculfhash.c b/rculfhash.c index b9f795f..d7d107f 100644 --- a/rculfhash.c +++ b/rculfhash.c @@ -117,7 +117,7 @@ * To discuss these guarantees, we first define "read" operation as any * of the the basic cds_lfht_lookup, cds_lfht_next_duplicate, * cds_lfht_first, cds_lfht_next operation, as well as - * cds_lfht_add_unique (failure). + * cds_lfht_add_unique (failure). * * We define "read traversal" operation as any of the following * group of operations @@ -129,7 +129,7 @@ * - cds_lfht_first followed iteration with cds_lfht_next (and/or * cds_lfht_next_duplicate, although less common). * - * We define "write" operations as any of cds_lfht_add, + * We define "write" operations as any of cds_lfht_add, cds_lfht_replace, * cds_lfht_add_unique (success), cds_lfht_add_replace, cds_lfht_del. * * When cds_lfht_add_unique succeeds (returns the node passed as @@ -225,12 +225,12 @@ * shrink hash table from order 6 to 5: fini the index=6 bucket node table * * A bit of ascii art explanation: - * + * * The order index is the off-by-one compared to the actual power of 2 * because we use index 0 to deal with the 0 special-case. - * + * * This shows the nodes for a small table ordered by reversed bits: - * + * * bits reverse * 0 000 000 * 4 100 001 @@ -240,10 +240,10 @@ * 5 101 101 * 3 011 110 * 7 111 111 - * - * This shows the nodes in order of non-reversed bits, linked by + * + * This shows the nodes in order of non-reversed bits, linked by * reversed-bit order. - * + * * order bits reverse * 0 0 000 000 * 1 | 1 001 100 <- @@ -264,9 +264,11 @@ #include #include #include +#include #include "config.h" -#include +#include "compat-getcpu.h" +#include #include #include #include @@ -281,7 +283,7 @@ * Split-counters lazily update the global counter each 1024 * addition/removal. It automatically keeps track of resize required. * We use the bucket length as indicator for need to expand for small - * tables and machines lacking per-cpu data suppport. + * tables and machines lacking per-cpu data support. */ #define COUNT_COMMIT_ORDER 10 #define DEFAULT_SPLIT_COUNT_MASK 0xFUL @@ -364,7 +366,7 @@ struct partition_resize_work { * Originally from Public Domain. */ -static const uint8_t BitReverseTable256[256] = +static const uint8_t BitReverseTable256[256] = { #define R2(n) (n), (n) + 2*64, (n) + 1*64, (n) + 3*64 #define R4(n) R2(n), R2((n) + 2*16), R2((n) + 1*16), R2((n) + 3*16) @@ -381,27 +383,29 @@ uint8_t bit_reverse_u8(uint8_t v) return BitReverseTable256[v]; } -static __attribute__((unused)) +#if (CAA_BITS_PER_LONG == 32) +static uint32_t bit_reverse_u32(uint32_t v) { - return ((uint32_t) bit_reverse_u8(v) << 24) | - ((uint32_t) bit_reverse_u8(v >> 8) << 16) | - ((uint32_t) bit_reverse_u8(v >> 16) << 8) | + return ((uint32_t) bit_reverse_u8(v) << 24) | + ((uint32_t) bit_reverse_u8(v >> 8) << 16) | + ((uint32_t) bit_reverse_u8(v >> 16) << 8) | ((uint32_t) bit_reverse_u8(v >> 24)); } - -static __attribute__((unused)) +#else +static uint64_t bit_reverse_u64(uint64_t v) { - return ((uint64_t) bit_reverse_u8(v) << 56) | - ((uint64_t) bit_reverse_u8(v >> 8) << 48) | + return ((uint64_t) bit_reverse_u8(v) << 56) | + ((uint64_t) bit_reverse_u8(v >> 8) << 48) | ((uint64_t) bit_reverse_u8(v >> 16) << 40) | ((uint64_t) bit_reverse_u8(v >> 24) << 32) | - ((uint64_t) bit_reverse_u8(v >> 32) << 24) | - ((uint64_t) bit_reverse_u8(v >> 40) << 16) | + ((uint64_t) bit_reverse_u8(v >> 32) << 24) | + ((uint64_t) bit_reverse_u8(v >> 40) << 16) | ((uint64_t) bit_reverse_u8(v >> 48) << 8) | ((uint64_t) bit_reverse_u8(v >> 56)); } +#endif static unsigned long bit_reverse_ulong(unsigned long v) @@ -424,7 +428,7 @@ unsigned int fls_u32(uint32_t x) { int r; - asm("bsrl %1,%0\n\t" + __asm__ ("bsrl %1,%0\n\t" "jnz 1f\n\t" "movl $-1,%0\n\t" "1:\n\t" @@ -440,7 +444,7 @@ unsigned int fls_u64(uint64_t x) { long r; - asm("bsrq %1,%0\n\t" + __asm__ ("bsrq %1,%0\n\t" "jnz 1f\n\t" "movq $-1,%0\n\t" "1:\n\t" @@ -561,6 +565,7 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size, static long nr_cpus_mask = -1; static long split_count_mask = -1; +static int split_count_order = -1; #if defined(HAVE_SYSCONF) static void ht_init_nr_cpus_mask(void) @@ -589,20 +594,21 @@ static void ht_init_nr_cpus_mask(void) static void alloc_split_items_count(struct cds_lfht *ht) { - struct ht_items_count *count; - if (nr_cpus_mask == -1) { ht_init_nr_cpus_mask(); if (nr_cpus_mask < 0) split_count_mask = DEFAULT_SPLIT_COUNT_MASK; else split_count_mask = nr_cpus_mask; + split_count_order = + cds_lfht_get_count_order_ulong(split_count_mask + 1); } assert(split_count_mask >= 0); if (ht->flags & CDS_LFHT_ACCOUNTING) { - ht->split_count = calloc(split_count_mask + 1, sizeof(*count)); + ht->split_count = calloc(split_count_mask + 1, + sizeof(struct ht_items_count)); assert(ht->split_count); } else { ht->split_count = NULL; @@ -615,26 +621,18 @@ void free_split_items_count(struct cds_lfht *ht) poison_free(ht->split_count); } -#if defined(HAVE_SCHED_GETCPU) static int ht_get_split_count_index(unsigned long hash) { int cpu; assert(split_count_mask >= 0); - cpu = sched_getcpu(); + cpu = urcu_sched_getcpu(); if (caa_unlikely(cpu < 0)) return hash & split_count_mask; else return cpu & split_count_mask; } -#else /* #if defined(HAVE_SCHED_GETCPU) */ -static -int ht_get_split_count_index(unsigned long hash) -{ - return hash & split_count_mask; -} -#endif /* #else #if defined(HAVE_SCHED_GETCPU) */ static void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash) @@ -712,14 +710,39 @@ void check_resize(struct cds_lfht *ht, unsigned long size, uint32_t chain_len) * Use bucket-local length for small table expand and for * environments lacking per-cpu data support. */ - if (count >= (1UL << COUNT_COMMIT_ORDER)) + if (count >= (1UL << (COUNT_COMMIT_ORDER + split_count_order))) return; if (chain_len > 100) dbg_printf("WARNING: large chain length: %u.\n", chain_len); - if (chain_len >= CHAIN_LEN_RESIZE_THRESHOLD) - cds_lfht_resize_lazy_grow(ht, size, - cds_lfht_get_count_order_u32(chain_len - (CHAIN_LEN_TARGET - 1))); + if (chain_len >= CHAIN_LEN_RESIZE_THRESHOLD) { + int growth; + + /* + * Ideal growth calculated based on chain length. + */ + growth = cds_lfht_get_count_order_u32(chain_len + - (CHAIN_LEN_TARGET - 1)); + if ((ht->flags & CDS_LFHT_ACCOUNTING) + && (size << growth) + >= (1UL << (COUNT_COMMIT_ORDER + + split_count_order))) { + /* + * If ideal growth expands the hash table size + * beyond the "small hash table" sizes, use the + * maximum small hash table size to attempt + * expanding the hash table. This only applies + * when node accounting is available, otherwise + * the chain length is used to expand the hash + * table in every case. + */ + growth = COUNT_COMMIT_ORDER + split_count_order + - cds_lfht_get_count_order_ulong(size); + if (growth <= 0) + return; + } + cds_lfht_resize_lazy_grow(ht, size, growth); + } } static @@ -734,12 +757,6 @@ int is_removed(struct cds_lfht_node *node) return ((unsigned long) node) & REMOVED_FLAG; } -static -struct cds_lfht_node *flag_removed(struct cds_lfht_node *node) -{ - return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG); -} - static int is_bucket(struct cds_lfht_node *node) { @@ -764,6 +781,12 @@ struct cds_lfht_node *flag_removal_owner(struct cds_lfht_node *node) return (struct cds_lfht_node *) (((unsigned long) node) | REMOVAL_OWNER_FLAG); } +static +struct cds_lfht_node *flag_removed_or_removal_owner(struct cds_lfht_node *node) +{ + return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG | REMOVAL_OWNER_FLAG); +} + static struct cds_lfht_node *get_end(void) { @@ -832,13 +855,16 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod assert(!is_bucket(bucket)); assert(!is_removed(bucket)); + assert(!is_removal_owner(bucket)); assert(!is_bucket(node)); assert(!is_removed(node)); + assert(!is_removal_owner(node)); for (;;) { iter_prev = bucket; /* We can always skip the bucket node initially */ iter = rcu_dereference(iter_prev->next); assert(!is_removed(iter)); + assert(!is_removal_owner(iter)); assert(iter_prev->reverse_hash <= node->reverse_hash); /* * We should never be called with bucket (start of chain) @@ -859,6 +885,7 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod iter = next; } assert(!is_removed(iter)); + assert(!is_removal_owner(iter)); if (is_bucket(iter)) new_next = flag_bucket(clear_flag(next)); else @@ -879,8 +906,10 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, return -ENOENT; assert(!is_removed(old_node)); + assert(!is_removal_owner(old_node)); assert(!is_bucket(old_node)); assert(!is_removed(new_node)); + assert(!is_removal_owner(new_node)); assert(!is_bucket(new_node)); assert(new_node != old_node); for (;;) { @@ -894,6 +923,12 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, } assert(old_next == clear_flag(old_next)); assert(new_node != old_next); + /* + * REMOVAL_OWNER flag is _NEVER_ set before the REMOVED + * flag. It is either set atomically at the same time + * (replace) or after (del). + */ + assert(!is_removal_owner(old_next)); new_node->next = old_next; /* * Here is the whole trick for lock-free replace: we add @@ -906,10 +941,12 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, * the old node, but will not see the new one. * This is a replacement of a node with another node * that has the same value: we are therefore not - * removing a value from the hash table. + * removing a value from the hash table. We set both the + * REMOVED and REMOVAL_OWNER flags atomically so we own + * the node after successful cmpxchg. */ ret_next = uatomic_cmpxchg(&old_node->next, - old_next, flag_removed(new_node)); + old_next, flag_removed_or_removal_owner(new_node)); if (ret_next == old_next) break; /* We performed the replacement. */ old_next = ret_next; @@ -923,7 +960,7 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, bucket = lookup_bucket(ht, size, bit_reverse_ulong(old_node->reverse_hash)); _cds_lfht_gc_bucket(bucket, new_node); - assert(is_removed(rcu_dereference(old_node->next))); + assert(is_removed(CMM_LOAD_SHARED(old_node->next))); return 0; } @@ -947,6 +984,7 @@ void _cds_lfht_add(struct cds_lfht *ht, assert(!is_bucket(node)); assert(!is_removed(node)); + assert(!is_removal_owner(node)); bucket = lookup_bucket(ht, size, hash); for (;;) { uint32_t chain_len = 0; @@ -1007,7 +1045,9 @@ void _cds_lfht_add(struct cds_lfht *ht, insert: assert(node != clear_flag(iter)); assert(!is_removed(iter_prev)); + assert(!is_removal_owner(iter_prev)); assert(!is_removed(iter)); + assert(!is_removal_owner(iter)); assert(iter_prev != node); if (!bucket_flag) node->next = clear_flag(iter); @@ -1027,6 +1067,7 @@ void _cds_lfht_add(struct cds_lfht *ht, gc_node: assert(!is_removed(iter)); + assert(!is_removal_owner(iter)); if (is_bucket(iter)) new_next = flag_bucket(clear_flag(next)); else @@ -1061,10 +1102,15 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, * logical removal flag). Return -ENOENT if the node had * previously been removed. */ - next = rcu_dereference(node->next); + next = CMM_LOAD_SHARED(node->next); /* next is not dereferenced */ if (caa_unlikely(is_removed(next))) return -ENOENT; assert(!is_bucket(next)); + /* + * The del operation semantic guarantees a full memory barrier + * before the uatomic_or atomic commit of the deletion flag. + */ + cmm_smp_mb__before_uatomic_or(); /* * We set the REMOVED_FLAG unconditionally. Note that there may * be more than one concurrent thread setting this flag. @@ -1082,7 +1128,7 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash)); _cds_lfht_gc_bucket(bucket, node); - assert(is_removed(rcu_dereference(node->next))); + assert(is_removed(CMM_LOAD_SHARED(node->next))); /* * Last phase: atomically exchange node->next with a version * having "REMOVAL_OWNER_FLAG" set. If the returned node->next @@ -1119,11 +1165,15 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, void (*fct)(struct cds_lfht *ht, unsigned long i, unsigned long start, unsigned long len)) { - unsigned long partition_len; + unsigned long partition_len, start = 0; struct partition_resize_work *work; int thread, ret; unsigned long nr_threads; + assert(nr_cpus_mask != -1); + if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) + goto fallback; + /* * Note: nr_cpus_mask + 1 is always power of 2. * We spawn just the number of threads we need to satisfy the minimum @@ -1137,7 +1187,10 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, } partition_len = len >> cds_lfht_get_count_order_ulong(nr_threads); work = calloc(nr_threads, sizeof(*work)); - assert(work); + if (!work) { + dbg_printf("error allocating for resize, single-threading\n"); + goto fallback; + } for (thread = 0; thread < nr_threads; thread++) { work[thread].ht = ht; work[thread].i = i; @@ -1146,6 +1199,17 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, work[thread].fct = fct; ret = pthread_create(&(work[thread].thread_id), ht->resize_attr, partition_resize_thread, &work[thread]); + if (ret == EAGAIN) { + /* + * Out of resources: wait and join the threads + * we've created, then handle leftovers. + */ + dbg_printf("error spawning for resize, single-threading\n"); + start = work[thread].start; + len -= start; + nr_threads = thread; + break; + } assert(!ret); } for (thread = 0; thread < nr_threads; thread++) { @@ -1153,6 +1217,18 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, assert(!ret); } free(work); + + /* + * A pthread_create failure above will either lead in us having + * no threads to join or starting at a non-zero offset, + * fallback to single thread processing of leftovers. + */ + if (start == 0 && nr_threads > 0) + return; +fallback: + ht->flavor->thread_online(); + fct(ht, i, start, len); + ht->flavor->thread_offline(); } /* @@ -1190,13 +1266,6 @@ static void init_table_populate(struct cds_lfht *ht, unsigned long i, unsigned long len) { - assert(nr_cpus_mask != -1); - if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) { - ht->flavor->thread_online(); - init_table_populate_partition(ht, i, 0, len); - ht->flavor->thread_offline(); - return; - } partition_resize_helper(ht, i, len, init_table_populate_partition); } @@ -1289,14 +1358,6 @@ void remove_table_partition(struct cds_lfht *ht, unsigned long i, static void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) { - - assert(nr_cpus_mask != -1); - if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) { - ht->flavor->thread_online(); - remove_table_partition(ht, i, 0, len); - ht->flavor->thread_offline(); - return; - } partition_resize_helper(ht, i, len, remove_table_partition); } @@ -1319,7 +1380,7 @@ void fini_table(struct cds_lfht *ht, unsigned long len; len = 1UL << (i - 1); - dbg_printf("fini order %lu len: %lu\n", i, len); + dbg_printf("fini order %ld len: %lu\n", i, len); /* Stop shrink if the resize target changes under us */ if (CMM_LOAD_SHARED(ht->resize_target) > (1UL << (i - 1))) @@ -1510,7 +1571,7 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, } node = clear_flag(next); } - assert(!node || !is_bucket(rcu_dereference(node->next))); + assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); iter->node = node; iter->next = next; } @@ -1543,7 +1604,7 @@ void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match, } node = clear_flag(next); } - assert(!node || !is_bucket(rcu_dereference(node->next))); + assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); iter->node = node; iter->next = next; } @@ -1565,7 +1626,7 @@ void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) } node = clear_flag(next); } - assert(!node || !is_bucket(rcu_dereference(node->next))); + assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); iter->node = node; iter->next = next; } @@ -1654,12 +1715,14 @@ int cds_lfht_replace(struct cds_lfht *ht, int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node) { - unsigned long size, hash; + unsigned long size; int ret; size = rcu_dereference(ht->size); ret = _cds_lfht_del(ht, size, node); if (!ret) { + unsigned long hash; + hash = bit_reverse_ulong(node->reverse_hash); ht_count_del(ht, size, hash); } @@ -1668,7 +1731,7 @@ int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node) int cds_lfht_is_node_deleted(struct cds_lfht_node *node) { - return is_removed(rcu_dereference(node->next)); + return is_removed(CMM_LOAD_SHARED(node->next)); } static @@ -1684,6 +1747,7 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) if (!is_bucket(node)) return -EPERM; assert(!is_removed(node)); + assert(!is_removal_owner(node)); } while (!is_end(node)); /* * size accessed without rcu_dereference because hash table is @@ -1710,20 +1774,36 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) */ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) { - int ret; + int ret, was_online; /* Wait for in-flight resize operations to complete */ _CMM_STORE_SHARED(ht->in_progress_destroy, 1); cmm_smp_mb(); /* Store destroy before load resize */ + was_online = ht->flavor->read_ongoing(); + if (was_online) + ht->flavor->thread_offline(); + /* Calling with RCU read-side held is an error. */ + if (ht->flavor->read_ongoing()) { + ret = -EINVAL; + if (was_online) + ht->flavor->thread_online(); + goto end; + } while (uatomic_read(&ht->in_progress_resize)) poll(NULL, 0, 100); /* wait for 100ms */ + if (was_online) + ht->flavor->thread_online(); ret = cds_lfht_delete_bucket(ht); if (ret) return ret; free_split_items_count(ht); if (attr) *attr = ht->resize_attr; + ret = pthread_mutex_destroy(&ht->resize_mutex); + if (ret) + ret = -EBUSY; poison_free(ht); +end: return ret; } @@ -1852,13 +1932,30 @@ void resize_target_update_count(struct cds_lfht *ht, void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size) { + int was_online; + + was_online = ht->flavor->read_ongoing(); + if (was_online) + ht->flavor->thread_offline(); + /* Calling with RCU read-side held is an error. */ + if (ht->flavor->read_ongoing()) { + static int print_once; + + if (!CMM_LOAD_SHARED(print_once)) + fprintf(stderr, "[error] rculfhash: cds_lfht_resize " + "called with RCU read-side lock held.\n"); + CMM_STORE_SHARED(print_once, 1); + assert(0); + goto end; + } resize_target_update_count(ht, new_size); CMM_STORE_SHARED(ht->resize_initiated, 1); - ht->flavor->thread_offline(); pthread_mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); pthread_mutex_unlock(&ht->resize_mutex); - ht->flavor->thread_online(); +end: + if (was_online) + ht->flavor->thread_online(); } static