X-Git-Url: http://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=rculfhash.c;h=57f1a04d46dd3d446ca88fa74267660dd7a26fe2;hp=bdf9fc000c4abe7b596c56eda705b3ca24ab041a;hb=d7f3ba4c16b6e855b517f3d36f4bb620f4fd2677;hpb=5bc6b66f4dd5e21dc30d647760530a3ce06f4c2f diff --git a/rculfhash.c b/rculfhash.c index bdf9fc0..57f1a04 100644 --- a/rculfhash.c +++ b/rculfhash.c @@ -34,24 +34,31 @@ * implementation: * * - RCU read-side critical section allows readers to perform hash - * table lookups and use the returned objects safely by delaying - * memory reclaim of a grace period. + * table lookups, as well as traversals, and use the returned objects + * safely by allowing memory reclaim to take place only after a grace + * period. * - Add and remove operations are lock-free, and do not need to * allocate memory. They need to be executed within RCU read-side * critical section to ensure the objects they read are valid and to * deal with the cmpxchg ABA problem. * - add and add_unique operations are supported. add_unique checks if - * the node key already exists in the hash table. It ensures no key - * duplicata exists. - * - The resize operation executes concurrently with add/remove/lookup. + * the node key already exists in the hash table. It ensures not to + * populate a duplicate key if the node key already exists in the hash + * table. + * - The resize operation executes concurrently with + * add/add_unique/add_replace/remove/lookup/traversal. * - Hash table nodes are contained within a split-ordered list. This * list is ordered by incrementing reversed-bits-hash value. * - An index of bucket nodes is kept. These bucket nodes are the hash - * table "buckets", and they are also chained together in the - * split-ordered list, which allows recursive expansion. - * - The resize operation for small tables only allows expanding the hash table. - * It is triggered automatically by detecting long chains in the add - * operation. + * table "buckets". These buckets are internal nodes that allow to + * perform a fast hash lookup, similarly to a skip list. These + * buckets are chained together in the split-ordered list, which + * allows recursive expansion by inserting new buckets between the + * existing buckets. The split-ordered list allows adding new buckets + * between existing buckets as the table needs to grow. + * - The resize operation for small tables only allows expanding the + * hash table. It is triggered automatically by detecting long chains + * in the add operation. * - The resize operation for larger tables (and available through an * API) allows both expanding and shrinking the hash table. * - Split-counters are used to keep track of the number of @@ -71,32 +78,131 @@ * (not visible to lookups anymore) before the RCU read-side critical * section held across removal ends. Furthermore, this ensures that * the node with "removed" flag set is removed from the linked-list - * before its memory is reclaimed. Only the thread which removal - * successfully set the "removed" flag (with a cmpxchg) into a node's - * next pointer is considered to have succeeded its removal (and thus - * owns the node to reclaim). Because we garbage-collect starting from - * an invariant node (the start-of-bucket bucket node) up to the - * "removed" node (or find a reverse-hash that is higher), we are sure - * that a successful traversal of the chain leads to a chain that is - * present in the linked-list (the start node is never removed) and - * that is does not contain the "removed" node anymore, even if - * concurrent delete/add operations are changing the structure of the - * list concurrently. - * - The add operation performs gargage collection of buckets if it - * encounters nodes with removed flag set in the bucket where it wants - * to add its new node. This ensures lock-freedom of add operation by + * before its memory is reclaimed. After setting the "removal" flag, + * only the thread which removal is the first to set the "removal + * owner" flag (with an xchg) into a node's next pointer is considered + * to have succeeded its removal (and thus owns the node to reclaim). + * Because we garbage-collect starting from an invariant node (the + * start-of-bucket bucket node) up to the "removed" node (or find a + * reverse-hash that is higher), we are sure that a successful + * traversal of the chain leads to a chain that is present in the + * linked-list (the start node is never removed) and that it does not + * contain the "removed" node anymore, even if concurrent delete/add + * operations are changing the structure of the list concurrently. + * - The add operations perform garbage collection of buckets if they + * encounter nodes with removed flag set in the bucket where they want + * to add their new node. This ensures lock-freedom of add operation by * helping the remover unlink nodes from the list rather than to wait * for it do to so. - * - A RCU "order table" indexed by log2(hash index) is copied and - * expanded by the resize operation. This order table allows finding - * the "bucket node" tables. - * - There is one bucket node table per hash index order. The size of - * each bucket node table is half the number of hashes contained in - * this order (except for order 0). - * - synchronzie_rcu is used to garbage-collect the old bucket node table. - * - The per-order bucket node tables contain a compact version of the - * hash table nodes. These tables are invariant after they are - * populated into the hash table. + * - There are three memory backends for the hash table buckets: the + * "order table", the "chunks", and the "mmap". + * - These bucket containers contain a compact version of the hash table + * nodes. + * - The RCU "order table": + * - has a first level table indexed by log2(hash index) which is + * copied and expanded by the resize operation. This order table + * allows finding the "bucket node" tables. + * - There is one bucket node table per hash index order. The size of + * each bucket node table is half the number of hashes contained in + * this order (except for order 0). + * - The RCU "chunks" is best suited for close interaction with a page + * allocator. It uses a linear array as index to "chunks" containing + * each the same number of buckets. + * - The RCU "mmap" memory backend uses a single memory map to hold + * all buckets. + * - synchronize_rcu is used to garbage-collect the old bucket node table. + * + * Ordering Guarantees: + * + * To discuss these guarantees, we first define "read" operation as any + * of the the basic cds_lfht_lookup, cds_lfht_next_duplicate, + * cds_lfht_first, cds_lfht_next operation, as well as + * cds_lfht_add_unique (failure). + * + * We define "read traversal" operation as any of the following + * group of operations + * - cds_lfht_lookup followed by iteration with cds_lfht_next_duplicate + * (and/or cds_lfht_next, although less common). + * - cds_lfht_add_unique (failure) followed by iteration with + * cds_lfht_next_duplicate (and/or cds_lfht_next, although less + * common). + * - cds_lfht_first followed iteration with cds_lfht_next (and/or + * cds_lfht_next_duplicate, although less common). + * + * We define "write" operations as any of cds_lfht_add, cds_lfht_replace, + * cds_lfht_add_unique (success), cds_lfht_add_replace, cds_lfht_del. + * + * When cds_lfht_add_unique succeeds (returns the node passed as + * parameter), it acts as a "write" operation. When cds_lfht_add_unique + * fails (returns a node different from the one passed as parameter), it + * acts as a "read" operation. A cds_lfht_add_unique failure is a + * cds_lfht_lookup "read" operation, therefore, any ordering guarantee + * referring to "lookup" imply any of "lookup" or cds_lfht_add_unique + * (failure). + * + * We define "prior" and "later" node as nodes observable by reads and + * read traversals respectively before and after a write or sequence of + * write operations. + * + * Hash-table operations are often cascaded, for example, the pointer + * returned by a cds_lfht_lookup() might be passed to a cds_lfht_next(), + * whose return value might in turn be passed to another hash-table + * operation. This entire cascaded series of operations must be enclosed + * by a pair of matching rcu_read_lock() and rcu_read_unlock() + * operations. + * + * The following ordering guarantees are offered by this hash table: + * + * A.1) "read" after "write": if there is ordering between a write and a + * later read, then the read is guaranteed to see the write or some + * later write. + * A.2) "read traversal" after "write": given that there is dependency + * ordering between reads in a "read traversal", if there is + * ordering between a write and the first read of the traversal, + * then the "read traversal" is guaranteed to see the write or + * some later write. + * B.1) "write" after "read": if there is ordering between a read and a + * later write, then the read will never see the write. + * B.2) "write" after "read traversal": given that there is dependency + * ordering between reads in a "read traversal", if there is + * ordering between the last read of the traversal and a later + * write, then the "read traversal" will never see the write. + * C) "write" while "read traversal": if a write occurs during a "read + * traversal", the traversal may, or may not, see the write. + * D.1) "write" after "write": if there is ordering between a write and + * a later write, then the later write is guaranteed to see the + * effects of the first write. + * D.2) Concurrent "write" pairs: The system will assign an arbitrary + * order to any pair of concurrent conflicting writes. + * Non-conflicting writes (for example, to different keys) are + * unordered. + * E) If a grace period separates a "del" or "replace" operation + * and a subsequent operation, then that subsequent operation is + * guaranteed not to see the removed item. + * F) Uniqueness guarantee: given a hash table that does not contain + * duplicate items for a given key, there will only be one item in + * the hash table after an arbitrary sequence of add_unique and/or + * add_replace operations. Note, however, that a pair of + * concurrent read operations might well access two different items + * with that key. + * G.1) If a pair of lookups for a given key are ordered (e.g. by a + * memory barrier), then the second lookup will return the same + * node as the previous lookup, or some later node. + * G.2) A "read traversal" that starts after the end of a prior "read + * traversal" (ordered by memory barriers) is guaranteed to see the + * same nodes as the previous traversal, or some later nodes. + * G.3) Concurrent "read" pairs: concurrent reads are unordered. For + * example, if a pair of reads to the same key run concurrently + * with an insertion of that same key, the reads remain unordered + * regardless of their return values. In other words, you cannot + * rely on the values returned by the reads to deduce ordering. + * + * Progress guarantees: + * + * * Reads are wait-free. These operations always move forward in the + * hash table linked list, and this list has no loop. + * * Writes are lock-free. Any retry loop performed by a write operation + * is triggered by progress made within another update operation. * * Bucket node tables: * @@ -120,8 +226,8 @@ * * A bit of ascii art explanation: * - * Order index is the off-by-one compare to the actual power of 2 because - * we use index 0 to deal with the 0 special-case. + * The order index is the off-by-one compared to the actual power of 2 + * because we use index 0 to deal with the 0 special-case. * * This shows the nodes for a small table ordered by reversed bits: * @@ -150,12 +256,14 @@ */ #define _LGPL_SOURCE +#define _GNU_SOURCE #include #include #include #include #include #include +#include #include "config.h" #include @@ -173,7 +281,7 @@ * Split-counters lazily update the global counter each 1024 * addition/removal. It automatically keeps track of resize required. * We use the bucket length as indicator for need to expand for small - * tables and machines lacking per-cpu data suppport. + * tables and machines lacking per-cpu data support. */ #define COUNT_COMMIT_ORDER 10 #define DEFAULT_SPLIT_COUNT_MASK 0xFUL @@ -198,10 +306,16 @@ * removal, and that node garbage collection must be performed. * The bucket flag does not require to be updated atomically with the * pointer, but it is added as a pointer low bit flag to save space. + * The "removal owner" flag is used to detect which of the "del" + * operation that has set the "removed flag" gets to return the removed + * node to its caller. Note that the replace operation does not need to + * iteract with the "removal owner" flag, because it validates that + * the "removed" flag is not set before performing its cmpxchg. */ #define REMOVED_FLAG (1UL << 0) #define BUCKET_FLAG (1UL << 1) -#define FLAGS_MASK ((1UL << 2) - 1) +#define REMOVAL_OWNER_FLAG (1UL << 2) +#define FLAGS_MASK ((1UL << 3) - 1) /* Value of the end pointer. Should not interact with flags. */ #define END_VALUE NULL @@ -267,7 +381,8 @@ uint8_t bit_reverse_u8(uint8_t v) return BitReverseTable256[v]; } -static __attribute__((unused)) +#if (CAA_BITS_PER_LONG == 32) +static uint32_t bit_reverse_u32(uint32_t v) { return ((uint32_t) bit_reverse_u8(v) << 24) | @@ -275,8 +390,8 @@ uint32_t bit_reverse_u32(uint32_t v) ((uint32_t) bit_reverse_u8(v >> 16) << 8) | ((uint32_t) bit_reverse_u8(v >> 24)); } - -static __attribute__((unused)) +#else +static uint64_t bit_reverse_u64(uint64_t v) { return ((uint64_t) bit_reverse_u8(v) << 56) | @@ -288,6 +403,7 @@ uint64_t bit_reverse_u64(uint64_t v) ((uint64_t) bit_reverse_u8(v >> 48) << 8) | ((uint64_t) bit_reverse_u8(v >> 56)); } +#endif static unsigned long bit_reverse_ulong(unsigned long v) @@ -447,6 +563,7 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size, static long nr_cpus_mask = -1; static long split_count_mask = -1; +static int split_count_order = -1; #if defined(HAVE_SYSCONF) static void ht_init_nr_cpus_mask(void) @@ -475,20 +592,21 @@ static void ht_init_nr_cpus_mask(void) static void alloc_split_items_count(struct cds_lfht *ht) { - struct ht_items_count *count; - if (nr_cpus_mask == -1) { ht_init_nr_cpus_mask(); if (nr_cpus_mask < 0) split_count_mask = DEFAULT_SPLIT_COUNT_MASK; else split_count_mask = nr_cpus_mask; + split_count_order = + cds_lfht_get_count_order_ulong(split_count_mask + 1); } assert(split_count_mask >= 0); if (ht->flags & CDS_LFHT_ACCOUNTING) { - ht->split_count = calloc(split_count_mask + 1, sizeof(*count)); + ht->split_count = calloc(split_count_mask + 1, + sizeof(struct ht_items_count)); assert(ht->split_count); } else { ht->split_count = NULL; @@ -598,14 +716,39 @@ void check_resize(struct cds_lfht *ht, unsigned long size, uint32_t chain_len) * Use bucket-local length for small table expand and for * environments lacking per-cpu data support. */ - if (count >= (1UL << COUNT_COMMIT_ORDER)) + if (count >= (1UL << (COUNT_COMMIT_ORDER + split_count_order))) return; if (chain_len > 100) dbg_printf("WARNING: large chain length: %u.\n", chain_len); - if (chain_len >= CHAIN_LEN_RESIZE_THRESHOLD) - cds_lfht_resize_lazy_grow(ht, size, - cds_lfht_get_count_order_u32(chain_len - (CHAIN_LEN_TARGET - 1))); + if (chain_len >= CHAIN_LEN_RESIZE_THRESHOLD) { + int growth; + + /* + * Ideal growth calculated based on chain length. + */ + growth = cds_lfht_get_count_order_u32(chain_len + - (CHAIN_LEN_TARGET - 1)); + if ((ht->flags & CDS_LFHT_ACCOUNTING) + && (size << growth) + >= (1UL << (COUNT_COMMIT_ORDER + + split_count_order))) { + /* + * If ideal growth expands the hash table size + * beyond the "small hash table" sizes, use the + * maximum small hash table size to attempt + * expanding the hash table. This only applies + * when node accounting is available, otherwise + * the chain length is used to expand the hash + * table in every case. + */ + growth = COUNT_COMMIT_ORDER + split_count_order + - cds_lfht_get_count_order_ulong(size); + if (growth <= 0) + return; + } + cds_lfht_resize_lazy_grow(ht, size, growth); + } } static @@ -620,12 +763,6 @@ int is_removed(struct cds_lfht_node *node) return ((unsigned long) node) & REMOVED_FLAG; } -static -struct cds_lfht_node *flag_removed(struct cds_lfht_node *node) -{ - return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG); -} - static int is_bucket(struct cds_lfht_node *node) { @@ -638,6 +775,24 @@ struct cds_lfht_node *flag_bucket(struct cds_lfht_node *node) return (struct cds_lfht_node *) (((unsigned long) node) | BUCKET_FLAG); } +static +int is_removal_owner(struct cds_lfht_node *node) +{ + return ((unsigned long) node) & REMOVAL_OWNER_FLAG; +} + +static +struct cds_lfht_node *flag_removal_owner(struct cds_lfht_node *node) +{ + return (struct cds_lfht_node *) (((unsigned long) node) | REMOVAL_OWNER_FLAG); +} + +static +struct cds_lfht_node *flag_removed_or_removal_owner(struct cds_lfht_node *node) +{ + return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG | REMOVAL_OWNER_FLAG); +} + static struct cds_lfht_node *get_end(void) { @@ -706,13 +861,16 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod assert(!is_bucket(bucket)); assert(!is_removed(bucket)); + assert(!is_removal_owner(bucket)); assert(!is_bucket(node)); assert(!is_removed(node)); + assert(!is_removal_owner(node)); for (;;) { iter_prev = bucket; /* We can always skip the bucket node initially */ iter = rcu_dereference(iter_prev->next); assert(!is_removed(iter)); + assert(!is_removal_owner(iter)); assert(iter_prev->reverse_hash <= node->reverse_hash); /* * We should never be called with bucket (start of chain) @@ -733,13 +891,13 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod iter = next; } assert(!is_removed(iter)); + assert(!is_removal_owner(iter)); if (is_bucket(iter)) new_next = flag_bucket(clear_flag(next)); else new_next = clear_flag(next); (void) uatomic_cmpxchg(&iter_prev->next, iter, new_next); } - return; } static @@ -754,8 +912,10 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, return -ENOENT; assert(!is_removed(old_node)); + assert(!is_removal_owner(old_node)); assert(!is_bucket(old_node)); assert(!is_removed(new_node)); + assert(!is_removal_owner(new_node)); assert(!is_bucket(new_node)); assert(new_node != old_node); for (;;) { @@ -767,9 +927,15 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, */ return -ENOENT; } - assert(!is_bucket(old_next)); - assert(new_node != clear_flag(old_next)); - new_node->next = clear_flag(old_next); + assert(old_next == clear_flag(old_next)); + assert(new_node != old_next); + /* + * REMOVAL_OWNER flag is _NEVER_ set before the REMOVED + * flag. It is either set atomically at the same time + * (replace) or after (del). + */ + assert(!is_removal_owner(old_next)); + new_node->next = old_next; /* * Here is the whole trick for lock-free replace: we add * the replacement node _after_ the node we want to @@ -779,9 +945,14 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, * next pointer, they will either skip the old node due * to the removal flag and see the new node, or use * the old node, but will not see the new one. + * This is a replacement of a node with another node + * that has the same value: we are therefore not + * removing a value from the hash table. We set both the + * REMOVED and REMOVAL_OWNER flags atomically so we own + * the node after successful cmpxchg. */ ret_next = uatomic_cmpxchg(&old_node->next, - old_next, flag_removed(new_node)); + old_next, flag_removed_or_removal_owner(new_node)); if (ret_next == old_next) break; /* We performed the replacement. */ old_next = ret_next; @@ -795,7 +966,7 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, bucket = lookup_bucket(ht, size, bit_reverse_ulong(old_node->reverse_hash)); _cds_lfht_gc_bucket(bucket, new_node); - assert(is_removed(rcu_dereference(old_node->next))); + assert(is_removed(CMM_LOAD_SHARED(old_node->next))); return 0; } @@ -805,6 +976,7 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, */ static void _cds_lfht_add(struct cds_lfht *ht, + unsigned long hash, cds_lfht_match_fct match, const void *key, unsigned long size, @@ -818,7 +990,8 @@ void _cds_lfht_add(struct cds_lfht *ht, assert(!is_bucket(node)); assert(!is_removed(node)); - bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash)); + assert(!is_removal_owner(node)); + bucket = lookup_bucket(ht, size, hash); for (;;) { uint32_t chain_len = 0; @@ -856,8 +1029,8 @@ void _cds_lfht_add(struct cds_lfht *ht, * * This semantic ensures no duplicated keys * should ever be observable in the table - * (including observe one node by one node - * by forward iterations) + * (including traversing the table node by + * node by forward iterations) */ cds_lfht_next_duplicate(ht, match, key, &d_iter); if (!d_iter.node) @@ -878,7 +1051,9 @@ void _cds_lfht_add(struct cds_lfht *ht, insert: assert(node != clear_flag(iter)); assert(!is_removed(iter_prev)); + assert(!is_removal_owner(iter_prev)); assert(!is_removed(iter)); + assert(!is_removal_owner(iter)); assert(iter_prev != node); if (!bucket_flag) node->next = clear_flag(iter); @@ -898,6 +1073,7 @@ void _cds_lfht_add(struct cds_lfht *ht, gc_node: assert(!is_removed(iter)); + assert(!is_removal_owner(iter)); if (is_bucket(iter)) new_next = flag_bucket(clear_flag(next)); else @@ -914,10 +1090,9 @@ end: static int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, - struct cds_lfht_node *node, - int bucket_removal) + struct cds_lfht_node *node) { - struct cds_lfht_node *bucket, *next, *old; + struct cds_lfht_node *bucket, *next; if (!node) /* Return -ENOENT if asked to delete NULL node */ return -ENOENT; @@ -925,20 +1100,30 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, /* logically delete the node */ assert(!is_bucket(node)); assert(!is_removed(node)); - old = rcu_dereference(node->next); - do { - struct cds_lfht_node *new_next; + assert(!is_removal_owner(node)); - next = old; - if (caa_unlikely(is_removed(next))) - return -ENOENT; - if (bucket_removal) - assert(is_bucket(next)); - else - assert(!is_bucket(next)); - new_next = flag_removed(next); - old = uatomic_cmpxchg(&node->next, next, new_next); - } while (old != next); + /* + * We are first checking if the node had previously been + * logically removed (this check is not atomic with setting the + * logical removal flag). Return -ENOENT if the node had + * previously been removed. + */ + next = CMM_LOAD_SHARED(node->next); /* next is not dereferenced */ + if (caa_unlikely(is_removed(next))) + return -ENOENT; + assert(!is_bucket(next)); + /* + * The del operation semantic guarantees a full memory barrier + * before the uatomic_or atomic commit of the deletion flag. + */ + cmm_smp_mb__before_uatomic_or(); + /* + * We set the REMOVED_FLAG unconditionally. Note that there may + * be more than one concurrent thread setting this flag. + * Knowing which wins the race will be known after the garbage + * collection phase, stay tuned! + */ + uatomic_or(&node->next, REMOVED_FLAG); /* We performed the (logical) deletion. */ /* @@ -949,8 +1134,24 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash)); _cds_lfht_gc_bucket(bucket, node); - assert(is_removed(rcu_dereference(node->next))); - return 0; + assert(is_removed(CMM_LOAD_SHARED(node->next))); + /* + * Last phase: atomically exchange node->next with a version + * having "REMOVAL_OWNER_FLAG" set. If the returned node->next + * pointer did _not_ have "REMOVAL_OWNER_FLAG" set, we now own + * the node and win the removal race. + * It is interesting to note that all "add" paths are forbidden + * to change the next pointer starting from the point where the + * REMOVED_FLAG is set, so here using a read, followed by a + * xchg() suffice to guarantee that the xchg() will ever only + * set the "REMOVAL_OWNER_FLAG" (or change nothing if the flag + * was already set). + */ + if (!is_removal_owner(uatomic_xchg(&node->next, + flag_removal_owner(node->next)))) + return 0; + else + return -ENOENT; } static @@ -970,11 +1171,15 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, void (*fct)(struct cds_lfht *ht, unsigned long i, unsigned long start, unsigned long len)) { - unsigned long partition_len; + unsigned long partition_len, start = 0; struct partition_resize_work *work; int thread, ret; unsigned long nr_threads; + assert(nr_cpus_mask != -1); + if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) + goto fallback; + /* * Note: nr_cpus_mask + 1 is always power of 2. * We spawn just the number of threads we need to satisfy the minimum @@ -988,7 +1193,10 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, } partition_len = len >> cds_lfht_get_count_order_ulong(nr_threads); work = calloc(nr_threads, sizeof(*work)); - assert(work); + if (!work) { + dbg_printf("error allocating for resize, single-threading\n"); + goto fallback; + } for (thread = 0; thread < nr_threads; thread++) { work[thread].ht = ht; work[thread].i = i; @@ -997,6 +1205,17 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, work[thread].fct = fct; ret = pthread_create(&(work[thread].thread_id), ht->resize_attr, partition_resize_thread, &work[thread]); + if (ret == EAGAIN) { + /* + * Out of resources: wait and join the threads + * we've created, then handle leftovers. + */ + dbg_printf("error spawning for resize, single-threading\n"); + start = work[thread].start; + len -= start; + nr_threads = thread; + break; + } assert(!ret); } for (thread = 0; thread < nr_threads; thread++) { @@ -1004,6 +1223,18 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, assert(!ret); } free(work); + + /* + * A pthread_create failure above will either lead in us having + * no threads to join or starting at a non-zero offset, + * fallback to single thread processing of leftovers. + */ + if (start == 0 && nr_threads > 0) + return; +fallback: + ht->flavor->thread_online(); + fct(ht, i, start, len); + ht->flavor->thread_offline(); } /* @@ -1032,7 +1263,7 @@ void init_table_populate_partition(struct cds_lfht *ht, unsigned long i, dbg_printf("init populate: order %lu index %lu hash %lu\n", i, j, j); new_node->reverse_hash = bit_reverse_ulong(j); - _cds_lfht_add(ht, NULL, NULL, size, new_node, NULL, 1); + _cds_lfht_add(ht, j, NULL, NULL, size, new_node, NULL, 1); } ht->flavor->read_unlock(); } @@ -1041,13 +1272,6 @@ static void init_table_populate(struct cds_lfht *ht, unsigned long i, unsigned long len) { - assert(nr_cpus_mask != -1); - if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) { - ht->flavor->thread_online(); - init_table_populate_partition(ht, i, 0, len); - ht->flavor->thread_offline(); - return; - } partition_resize_helper(ht, i, len, init_table_populate_partition); } @@ -1107,8 +1331,8 @@ void init_table(struct cds_lfht *ht, * removed nodes have been garbage-collected (unlinked) before call_rcu is * invoked to free a hole level of bucket nodes (after a grace period). * - * Logical removal and garbage collection can therefore be done in batch or on a - * node-per-node basis, as long as the guarantee above holds. + * Logical removal and garbage collection can therefore be done in batch + * or on a node-per-node basis, as long as the guarantee above holds. * * When we reach a certain length, we can split this removal over many worker * threads, based on the number of CPUs available in the system. This should @@ -1124,13 +1348,15 @@ void remove_table_partition(struct cds_lfht *ht, unsigned long i, assert(i > MIN_TABLE_ORDER); ht->flavor->read_lock(); for (j = size + start; j < size + start + len; j++) { - struct cds_lfht_node *fini_node = bucket_at(ht, j); + struct cds_lfht_node *fini_bucket = bucket_at(ht, j); + struct cds_lfht_node *parent_bucket = bucket_at(ht, j - size); assert(j >= size && j < (size << 1)); dbg_printf("remove entry: order %lu index %lu hash %lu\n", i, j, j); - fini_node->reverse_hash = bit_reverse_ulong(j); - (void) _cds_lfht_del(ht, size, fini_node, 1); + /* Set the REMOVED_FLAG to freeze the ->next for gc */ + uatomic_or(&fini_bucket->next, REMOVED_FLAG); + _cds_lfht_gc_bucket(parent_bucket, fini_bucket); } ht->flavor->read_unlock(); } @@ -1138,14 +1364,6 @@ void remove_table_partition(struct cds_lfht *ht, unsigned long i, static void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) { - - assert(nr_cpus_mask != -1); - if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) { - ht->flavor->thread_online(); - remove_table_partition(ht, i, 0, len); - ht->flavor->thread_offline(); - return; - } partition_resize_helper(ht, i, len, remove_table_partition); } @@ -1359,7 +1577,7 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, } node = clear_flag(next); } - assert(!node || !is_bucket(rcu_dereference(node->next))); + assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); iter->node = node; iter->next = next; } @@ -1392,7 +1610,7 @@ void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match, } node = clear_flag(next); } - assert(!node || !is_bucket(rcu_dereference(node->next))); + assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); iter->node = node; iter->next = next; } @@ -1414,7 +1632,7 @@ void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) } node = clear_flag(next); } - assert(!node || !is_bucket(rcu_dereference(node->next))); + assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); iter->node = node; iter->next = next; } @@ -1434,9 +1652,9 @@ void cds_lfht_add(struct cds_lfht *ht, unsigned long hash, { unsigned long size; - node->reverse_hash = bit_reverse_ulong((unsigned long) hash); + node->reverse_hash = bit_reverse_ulong(hash); size = rcu_dereference(ht->size); - _cds_lfht_add(ht, NULL, NULL, size, node, NULL, 0); + _cds_lfht_add(ht, hash, NULL, NULL, size, node, NULL, 0); ht_count_add(ht, size, hash); } @@ -1449,9 +1667,9 @@ struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht, unsigned long size; struct cds_lfht_iter iter; - node->reverse_hash = bit_reverse_ulong((unsigned long) hash); + node->reverse_hash = bit_reverse_ulong(hash); size = rcu_dereference(ht->size); - _cds_lfht_add(ht, match, key, size, node, &iter, 0); + _cds_lfht_add(ht, hash, match, key, size, node, &iter, 0); if (iter.node == node) ht_count_add(ht, size, hash); return iter.node; @@ -1466,10 +1684,10 @@ struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht, unsigned long size; struct cds_lfht_iter iter; - node->reverse_hash = bit_reverse_ulong((unsigned long) hash); + node->reverse_hash = bit_reverse_ulong(hash); size = rcu_dereference(ht->size); for (;;) { - _cds_lfht_add(ht, match, key, size, node, &iter, 0); + _cds_lfht_add(ht, hash, match, key, size, node, &iter, 0); if (iter.node == node) { ht_count_add(ht, size, hash); return NULL; @@ -1480,30 +1698,48 @@ struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht, } } -int cds_lfht_replace(struct cds_lfht *ht, struct cds_lfht_iter *old_iter, +int cds_lfht_replace(struct cds_lfht *ht, + struct cds_lfht_iter *old_iter, + unsigned long hash, + cds_lfht_match_fct match, + const void *key, struct cds_lfht_node *new_node) { unsigned long size; + new_node->reverse_hash = bit_reverse_ulong(hash); + if (!old_iter->node) + return -ENOENT; + if (caa_unlikely(old_iter->node->reverse_hash != new_node->reverse_hash)) + return -EINVAL; + if (caa_unlikely(!match(old_iter->node, key))) + return -EINVAL; size = rcu_dereference(ht->size); return _cds_lfht_replace(ht, size, old_iter->node, old_iter->next, new_node); } -int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_iter *iter) +int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node) { - unsigned long size, hash; + unsigned long size; int ret; size = rcu_dereference(ht->size); - ret = _cds_lfht_del(ht, size, iter->node, 0); + ret = _cds_lfht_del(ht, size, node); if (!ret) { - hash = bit_reverse_ulong(iter->node->reverse_hash); + unsigned long hash; + + hash = bit_reverse_ulong(node->reverse_hash); ht_count_del(ht, size, hash); } return ret; } +int cds_lfht_is_node_deleted(struct cds_lfht_node *node) +{ + return is_removed(CMM_LOAD_SHARED(node->next)); +} + static int cds_lfht_delete_bucket(struct cds_lfht *ht) { @@ -1517,13 +1753,14 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) if (!is_bucket(node)) return -EPERM; assert(!is_removed(node)); + assert(!is_removal_owner(node)); } while (!is_end(node)); /* * size accessed without rcu_dereference because hash table is * being destroyed. */ size = ht->size; - /* Internal sanity check: all nodes left should be bucket */ + /* Internal sanity check: all nodes left should be buckets */ for (i = 0; i < size; i++) { node = bucket_at(ht, i); dbg_printf("delete bucket: index %lu expected hash %lu hash %lu\n", @@ -1543,13 +1780,25 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) */ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) { - int ret; + int ret, was_online; /* Wait for in-flight resize operations to complete */ _CMM_STORE_SHARED(ht->in_progress_destroy, 1); cmm_smp_mb(); /* Store destroy before load resize */ + was_online = ht->flavor->read_ongoing(); + if (was_online) + ht->flavor->thread_offline(); + /* Calling with RCU read-side held is an error. */ + if (ht->flavor->read_ongoing()) { + ret = -EINVAL; + if (was_online) + ht->flavor->thread_online(); + goto end; + } while (uatomic_read(&ht->in_progress_resize)) poll(NULL, 0, 100); /* wait for 100ms */ + if (was_online) + ht->flavor->thread_online(); ret = cds_lfht_delete_bucket(ht); if (ret) return ret; @@ -1557,17 +1806,17 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) if (attr) *attr = ht->resize_attr; poison_free(ht); +end: return ret; } void cds_lfht_count_nodes(struct cds_lfht *ht, long *approx_before, unsigned long *count, - unsigned long *removed, long *approx_after) { struct cds_lfht_node *node, *next; - unsigned long nr_bucket = 0; + unsigned long nr_bucket = 0, nr_removed = 0; *approx_before = 0; if (ht->split_count) { @@ -1580,7 +1829,6 @@ void cds_lfht_count_nodes(struct cds_lfht *ht, } *count = 0; - *removed = 0; /* Count non-bucket nodes in the table */ node = bucket_at(ht, 0); @@ -1588,7 +1836,7 @@ void cds_lfht_count_nodes(struct cds_lfht *ht, next = rcu_dereference(node->next); if (is_removed(next)) { if (!is_bucket(next)) - (*removed)++; + (nr_removed)++; else (nr_bucket)++; } else if (!is_bucket(next)) @@ -1597,6 +1845,7 @@ void cds_lfht_count_nodes(struct cds_lfht *ht, (nr_bucket)++; node = clear_flag(next); } while (!is_end(node)); + dbg_printf("number of logically removed nodes: %lu\n", nr_removed); dbg_printf("number of bucket nodes: %lu\n", nr_bucket); *approx_after = 0; if (ht->split_count) { @@ -1686,13 +1935,30 @@ void resize_target_update_count(struct cds_lfht *ht, void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size) { + int was_online; + + was_online = ht->flavor->read_ongoing(); + if (was_online) + ht->flavor->thread_offline(); + /* Calling with RCU read-side held is an error. */ + if (ht->flavor->read_ongoing()) { + static int print_once; + + if (!CMM_LOAD_SHARED(print_once)) + fprintf(stderr, "[error] rculfhash: cds_lfht_resize " + "called with RCU read-side lock held.\n"); + CMM_STORE_SHARED(print_once, 1); + assert(0); + goto end; + } resize_target_update_count(ht, new_size); CMM_STORE_SHARED(ht->resize_initiated, 1); - ht->flavor->thread_offline(); pthread_mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); pthread_mutex_unlock(&ht->resize_mutex); - ht->flavor->thread_online(); +end: + if (was_online) + ht->flavor->thread_online(); } static @@ -1727,6 +1993,11 @@ void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht) return; } work = malloc(sizeof(*work)); + if (work == NULL) { + dbg_printf("error allocating resize work, bailing out\n"); + uatomic_dec(&ht->in_progress_resize); + return; + } work->ht = ht; ht->flavor->update_call_rcu(&work->head, do_resize_cb); CMM_STORE_SHARED(ht->resize_initiated, 1);