X-Git-Url: https://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=rculfhash.c;h=1487980c832c115154937cda4b185024636f9e44;hp=40f6ff59be9b5427a64d9d4a9b07524a10fe7c6c;hb=b198f0fda77c816d80f63cfa2e71a7e0b4496736;hpb=c9edd44acae5da757017788908936d248a062199 diff --git a/rculfhash.c b/rculfhash.c index 40f6ff5..1487980 100644 --- a/rculfhash.c +++ b/rculfhash.c @@ -169,7 +169,6 @@ /* * Define the minimum table size. */ -//#define MIN_TABLE_SIZE 128 #define MIN_TABLE_SIZE 1 #if (CAA_BITS_PER_LONG == 32) @@ -178,6 +177,12 @@ #define MAX_TABLE_ORDER 64 #endif +/* + * Minimum number of dummy nodes to touch per thread to parallelize grow/shrink. + */ +#define MIN_PARTITION_PER_THREAD_ORDER 12 +#define MIN_PARTITION_PER_THREAD (1UL << MIN_PARTITION_PER_THREAD_ORDER) + #ifndef min #define min(a, b) ((a) < (b) ? (a) : (b)) #endif @@ -188,6 +193,8 @@ /* * The removed flag needs to be updated atomically with the pointer. + * It indicates that no node must attach to the node scheduled for + * removal, and that node garbage collection must be performed. * The dummy flag does not require to be updated atomically with the * pointer, but it is added as a pointer low bit flag to save space. */ @@ -196,10 +203,10 @@ #define FLAGS_MASK ((1UL << 2) - 1) /* Value of the end pointer. Should not interact with flags. */ -#define END_VALUE 0x4 +#define END_VALUE NULL struct ht_items_count { - unsigned long add, remove; + unsigned long add, del; } __attribute__((aligned(CAA_CACHE_LINE_SIZE))); struct rcu_level { @@ -236,6 +243,9 @@ struct cds_lfht { void (*cds_lfht_rcu_read_unlock)(void); void (*cds_lfht_rcu_thread_offline)(void); void (*cds_lfht_rcu_thread_online)(void); + void (*cds_lfht_rcu_register_thread)(void); + void (*cds_lfht_rcu_unregister_thread)(void); + pthread_attr_t *resize_attr; /* Resize threads attributes */ unsigned long count; /* global approximate item count */ struct ht_items_count *percpu_count; /* per-cpu item count */ }; @@ -245,11 +255,25 @@ struct rcu_resize_work { struct cds_lfht *ht; }; +struct partition_resize_work { + struct rcu_head head; + struct cds_lfht *ht; + unsigned long i, start, len; + void (*fct)(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len); +}; + +enum add_mode { + ADD_DEFAULT = 0, + ADD_UNIQUE = 1, + ADD_REPLACE = 2, +}; + static struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, unsigned long size, struct cds_lfht_node *node, - int unique, int dummy); + enum add_mode mode, int dummy); /* * Algorithm to reverse bits in a word by lookup table, extended to @@ -549,7 +573,7 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size) } static -void ht_count_remove(struct cds_lfht *ht, unsigned long size) +void ht_count_del(struct cds_lfht *ht, unsigned long size) { unsigned long percpu_count; int cpu; @@ -559,18 +583,18 @@ void ht_count_remove(struct cds_lfht *ht, unsigned long size) cpu = ht_get_cpu(); if (unlikely(cpu < 0)) return; - percpu_count = uatomic_add_return(&ht->percpu_count[cpu].remove, -1); + percpu_count = uatomic_add_return(&ht->percpu_count[cpu].del, -1); if (unlikely(!(percpu_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))) { unsigned long count; - dbg_printf("remove percpu %lu\n", percpu_count); + dbg_printf("del percpu %lu\n", percpu_count); count = uatomic_add_return(&ht->count, -(1UL << COUNT_COMMIT_ORDER)); /* If power of 2 */ if (!(count & (count - 1))) { if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size) return; - dbg_printf("remove set global %lu\n", count); + dbg_printf("del set global %lu\n", count); cds_lfht_resize_lazy_count(ht, size, count >> (CHAIN_LEN_TARGET - 1)); } @@ -598,7 +622,7 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size) } static -void ht_count_remove(struct cds_lfht *ht, unsigned long size) +void ht_count_del(struct cds_lfht *ht, unsigned long size) { } @@ -695,7 +719,7 @@ void cds_lfht_free_level(struct rcu_head *head) * Remove all logically deleted nodes from a bucket up to a certain node key. */ static -int _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node) +void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node) { struct cds_lfht_node *iter_prev, *iter, *next, *new_next; @@ -707,15 +731,6 @@ int _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node) iter_prev = dummy; /* We can always skip the dummy node initially */ iter = rcu_dereference(iter_prev->p.next); - if (unlikely(iter == NULL)) { - /* - * We are executing concurrently with a hash table - * expand, so we see a dummy node with NULL next value. - * Help expand by linking this node into the list and - * retry. - */ - return 1; - } assert(iter_prev->p.reverse_hash <= node->p.reverse_hash); /* * We should never be called with dummy (start of chain) @@ -725,11 +740,10 @@ int _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node) */ assert(dummy != node); for (;;) { - assert(iter != NULL); if (unlikely(is_end(iter))) - return 0; + return; if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash)) - return 0; + return; next = rcu_dereference(clear_flag(iter)->p.next); if (likely(is_removed(next))) break; @@ -741,23 +755,23 @@ int _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node) new_next = flag_dummy(clear_flag(next)); else new_next = clear_flag(next); - assert(new_next != NULL); + if (is_removed(iter)) + new_next = flag_removed(new_next); (void) uatomic_cmpxchg(&iter_prev->p.next, iter, new_next); } - return 0; + return; } static struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, unsigned long size, struct cds_lfht_node *node, - int unique, int dummy) + enum add_mode mode, int dummy) { struct cds_lfht_node *iter_prev, *iter, *next, *new_node, *new_next, - *dummy_node; + *dummy_node, *return_node; struct _cds_lfht_node *lookup; unsigned long hash, index, order; - int force_dummy = 0; assert(!is_dummy(node)); assert(!is_removed(node)); @@ -780,26 +794,8 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, iter_prev = (struct cds_lfht_node *) lookup; /* We can always skip the dummy node initially */ iter = rcu_dereference(iter_prev->p.next); - if (unlikely(iter == NULL)) { - /* - * We are executing concurrently with a hash table - * expand, so we see a dummy node with NULL next value. - * Help expand by linking this node into the list and - * retry. - */ - (void) _cds_lfht_add(ht, size >> 1, iter_prev, 0, 1); - continue; /* retry */ - } assert(iter_prev->p.reverse_hash <= node->p.reverse_hash); for (;;) { - assert(iter != NULL); - /* - * When adding a dummy node, we allow concurrent - * add/removal to help. If we find the dummy node in - * place, skip its insertion. - */ - if (unlikely(dummy && clear_flag(iter) == node)) - return node; if (unlikely(is_end(iter))) goto insert; if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash)) @@ -807,12 +803,16 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, next = rcu_dereference(clear_flag(iter)->p.next); if (unlikely(is_removed(next))) goto gc_node; - if (unique + if ((mode == ADD_UNIQUE || mode == ADD_REPLACE) && !is_dummy(next) && !ht->compare_fct(node->key, node->key_len, clear_flag(iter)->key, - clear_flag(iter)->key_len)) - return clear_flag(iter); + clear_flag(iter)->key_len)) { + if (mode == ADD_UNIQUE) + return clear_flag(iter); + else /* mode == ADD_REPLACE */ + goto replace; + } /* Only account for identical reverse hash once */ if (iter_prev->p.reverse_hash != clear_flag(iter)->p.reverse_hash && !is_dummy(next)) @@ -820,48 +820,70 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, iter_prev = clear_flag(iter); iter = next; } + insert: assert(node != clear_flag(iter)); assert(!is_removed(iter_prev)); assert(!is_removed(iter)); assert(iter_prev != node); - if (!dummy) { + if (!dummy) node->p.next = clear_flag(iter); - } else { - /* - * Dummy node insertion is performed concurrently (help - * scheme). We try to link its next node, and if this - * succeeds, it _means_ it's us who link this dummy node - * into the table. force_dummy is set as soon as we - * succeed this cmpxchg within this function. - */ - if (!force_dummy) { - if (uatomic_cmpxchg(&node->p.next, NULL, - flag_dummy(clear_flag(iter))) != NULL) { - return NULL; - } - force_dummy = 1; - } else { - node->p.next = flag_dummy(clear_flag(iter)); - } - } + else + node->p.next = flag_dummy(clear_flag(iter)); if (is_dummy(iter)) new_node = flag_dummy(node); else new_node = node; - assert(new_node != NULL); if (uatomic_cmpxchg(&iter_prev->p.next, iter, - new_node) != iter) + new_node) != iter) { continue; /* retry */ + } else { + if (mode == ADD_REPLACE) + return_node = NULL; + else /* ADD_DEFAULT and ADD_UNIQUE */ + return_node = node; + goto gc_end; + } + + replace: + /* Insert after node to be replaced */ + iter_prev = clear_flag(iter); + iter = next; + assert(node != clear_flag(iter)); + assert(!is_removed(iter_prev)); + assert(!is_removed(iter)); + assert(iter_prev != node); + assert(!dummy); + node->p.next = clear_flag(iter); + if (is_dummy(iter)) + new_node = flag_dummy(node); else + new_node = node; + /* + * Here is the whole trick for lock-free replace: we add + * the replacement node _after_ the node we want to + * replace by atomically setting its next pointer at the + * same time we set its removal flag. Given that + * the lookups/get next use an iterator aware of the + * next pointer, they will either skip the old node due + * to the removal flag and see the new node, or use + * the old node, but will not see the new one. + */ + new_node = flag_removed(new_node); + if (uatomic_cmpxchg(&iter_prev->p.next, + iter, new_node) != iter) { + continue; /* retry */ + } else { + return_node = iter_prev; goto gc_end; + } + gc_node: assert(!is_removed(iter)); if (is_dummy(iter)) new_next = flag_dummy(clear_flag(next)); else new_next = clear_flag(next); - assert(new_next != NULL); (void) uatomic_cmpxchg(&iter_prev->p.next, iter, new_next); /* retry */ } @@ -871,16 +893,12 @@ gc_end: order = get_count_order_ulong(index + 1); lookup = &ht->t.tbl[order]->nodes[index & (!order ? 0 : ((1UL << (order - 1)) - 1))]; dummy_node = (struct cds_lfht_node *) lookup; - if (_cds_lfht_gc_bucket(dummy_node, node)) { - /* Help expand */ - (void) _cds_lfht_add(ht, size >> 1, dummy_node, 0, 1); - goto gc_end; /* retry */ - } - return node; + _cds_lfht_gc_bucket(dummy_node, node); + return return_node; } static -int _cds_lfht_remove(struct cds_lfht *ht, unsigned long size, +int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, struct cds_lfht_node *node, int dummy_removal) { @@ -894,6 +912,8 @@ int _cds_lfht_remove(struct cds_lfht *ht, unsigned long size, assert(!is_removed(node)); old = rcu_dereference(node->p.next); do { + struct cds_lfht_node *new_next; + next = old; if (unlikely(is_removed(next))) goto end; @@ -901,9 +921,8 @@ int _cds_lfht_remove(struct cds_lfht *ht, unsigned long size, assert(is_dummy(next)); else assert(!is_dummy(next)); - assert(next != NULL); - old = uatomic_cmpxchg(&node->p.next, next, - flag_removed(next)); + new_next = flag_removed(next); + old = uatomic_cmpxchg(&node->p.next, next, new_next); } while (old != next); /* We performed the (logical) deletion. */ @@ -914,18 +933,13 @@ int _cds_lfht_remove(struct cds_lfht *ht, unsigned long size, * the node, and remove it (along with any other logically removed node) * if found. */ -gc_retry: hash = bit_reverse_ulong(node->p.reverse_hash); assert(size > 0); index = hash & (size - 1); order = get_count_order_ulong(index + 1); lookup = &ht->t.tbl[order]->nodes[index & (!order ? 0 : ((1UL << (order - 1)) - 1))]; dummy = (struct cds_lfht_node *) lookup; - if (_cds_lfht_gc_bucket(dummy, node)) { - /* Help expand */ - (void) _cds_lfht_add(ht, size >> 1, dummy, 0, 1); - goto gc_retry; /* retry */ - } + _cds_lfht_gc_bucket(dummy, node); end: /* * Only the flagging action indicated that we (and no other) @@ -939,49 +953,103 @@ end: } static -void init_table_hash(struct cds_lfht *ht, unsigned long i, - unsigned long len) +void *partition_resize_thread(void *arg) { - unsigned long j; + struct partition_resize_work *work = arg; - for (j = 0; j < len; j++) { - struct cds_lfht_node *new_node = - (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j]; + work->ht->cds_lfht_rcu_register_thread(); + work->fct(work->ht, work->i, work->start, work->len); + work->ht->cds_lfht_rcu_unregister_thread(); + return NULL; +} - dbg_printf("init hash entry: i %lu j %lu hash %lu\n", - i, j, !i ? 0 : (1UL << (i - 1)) + j); - new_node->p.reverse_hash = - bit_reverse_ulong(!i ? 0 : (1UL << (i - 1)) + j); - if (CMM_LOAD_SHARED(ht->in_progress_destroy)) - break; +static +void partition_resize_helper(struct cds_lfht *ht, unsigned long i, + unsigned long len, + void (*fct)(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len)) +{ + unsigned long partition_len; + struct partition_resize_work *work; + int thread, ret; + unsigned long nr_threads; + pthread_t *thread_id; + + /* + * Note: nr_cpus_mask + 1 is always power of 2. + * We spawn just the number of threads we need to satisfy the minimum + * partition size, up to the number of CPUs in the system. + */ + nr_threads = min(nr_cpus_mask + 1, + len >> MIN_PARTITION_PER_THREAD_ORDER); + partition_len = len >> get_count_order_ulong(nr_threads); + work = calloc(nr_threads, sizeof(*work)); + thread_id = calloc(nr_threads, sizeof(*thread_id)); + assert(work); + for (thread = 0; thread < nr_threads; thread++) { + work[thread].ht = ht; + work[thread].i = i; + work[thread].len = partition_len; + work[thread].start = thread * partition_len; + work[thread].fct = fct; + ret = pthread_create(&thread_id[thread], ht->resize_attr, + partition_resize_thread, &work[thread]); + assert(!ret); } + for (thread = 0; thread < nr_threads; thread++) { + ret = pthread_join(thread_id[thread], NULL); + assert(!ret); + } + free(work); + free(thread_id); } /* * Holding RCU read lock to protect _cds_lfht_add against memory * reclaim that could be performed by other call_rcu worker threads (ABA * problem). + * + * When we reach a certain length, we can split this population phase over + * many worker threads, based on the number of CPUs available in the system. + * This should therefore take care of not having the expand lagging behind too + * many concurrent insertion threads by using the scheduler's ability to + * schedule dummy node population fairly with insertions. */ static -void init_table_link(struct cds_lfht *ht, unsigned long i, unsigned long len) +void init_table_populate_partition(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len) { unsigned long j; - ht->cds_lfht_rcu_thread_online(); ht->cds_lfht_rcu_read_lock(); - for (j = 0; j < len; j++) { + for (j = start; j < start + len; j++) { struct cds_lfht_node *new_node = (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j]; - dbg_printf("init link: i %lu j %lu hash %lu\n", + dbg_printf("init populate: i %lu j %lu hash %lu\n", i, j, !i ? 0 : (1UL << (i - 1)) + j); + new_node->p.reverse_hash = + bit_reverse_ulong(!i ? 0 : (1UL << (i - 1)) + j); (void) _cds_lfht_add(ht, !i ? 0 : (1UL << (i - 1)), - new_node, 0, 1); + new_node, ADD_DEFAULT, 1); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) break; } ht->cds_lfht_rcu_read_unlock(); - ht->cds_lfht_rcu_thread_offline(); +} + +static +void init_table_populate(struct cds_lfht *ht, unsigned long i, + unsigned long len) +{ + assert(nr_cpus_mask != -1); + if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) { + ht->cds_lfht_rcu_thread_online(); + init_table_populate_partition(ht, i, 0, len); + ht->cds_lfht_rcu_thread_offline(); + return; + } + partition_resize_helper(ht, i, len, init_table_populate_partition); } static @@ -1005,24 +1073,19 @@ void init_table(struct cds_lfht *ht, ht->t.tbl[i] = calloc(1, sizeof(struct rcu_level) + (len * sizeof(struct _cds_lfht_node))); - - /* Set all dummy nodes reverse hash values for a level */ - init_table_hash(ht, i, len); + assert(ht->t.tbl[i]); /* - * Update table size. At this point, concurrent add/remove see - * dummy nodes with correctly initialized reverse hash value, - * but with NULL next pointers. If they do, they can help us - * link the dummy nodes into the list and retry. + * Set all dummy nodes reverse hash values for a level and + * link all dummy nodes into the table. */ - cmm_smp_wmb(); /* populate data before RCU size */ - CMM_STORE_SHARED(ht->t.size, !i ? 1 : (1UL << i)); + init_table_populate(ht, i, len); /* - * Link all dummy nodes into the table. Concurrent - * add/remove are helping us. + * Update table size. */ - init_table_link(ht, i, len); + cmm_smp_wmb(); /* populate data before RCU size */ + CMM_STORE_SHARED(ht->t.size, !i ? 1 : (1UL << i)); dbg_printf("init new size: %lu\n", !i ? 1 : (1UL << i)); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) @@ -1049,15 +1112,20 @@ void init_table(struct cds_lfht *ht, * * Logical removal and garbage collection can therefore be done in batch or on a * node-per-node basis, as long as the guarantee above holds. + * + * When we reach a certain length, we can split this removal over many worker + * threads, based on the number of CPUs available in the system. This should + * take care of not letting resize process lag behind too many concurrent + * updater threads actively inserting into the hash table. */ static -void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) +void remove_table_partition(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len) { unsigned long j; - ht->cds_lfht_rcu_thread_online(); ht->cds_lfht_rcu_read_lock(); - for (j = 0; j < len; j++) { + for (j = start; j < start + len; j++) { struct cds_lfht_node *fini_node = (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j]; @@ -1065,13 +1133,26 @@ void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) i, j, !i ? 0 : (1UL << (i - 1)) + j); fini_node->p.reverse_hash = bit_reverse_ulong(!i ? 0 : (1UL << (i - 1)) + j); - (void) _cds_lfht_remove(ht, !i ? 0 : (1UL << (i - 1)), + (void) _cds_lfht_del(ht, !i ? 0 : (1UL << (i - 1)), fini_node, 1); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) break; } ht->cds_lfht_rcu_read_unlock(); - ht->cds_lfht_rcu_thread_offline(); +} + +static +void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) +{ + + assert(nr_cpus_mask != -1); + if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) { + ht->cds_lfht_rcu_thread_online(); + remove_table_partition(ht, i, 0, len); + ht->cds_lfht_rcu_thread_offline(); + return; + } + partition_resize_helper(ht, i, len, remove_table_partition); } static @@ -1121,7 +1202,7 @@ void fini_table(struct cds_lfht *ht, } } -struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, +struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct, cds_lfht_compare_fct compare_fct, unsigned long hash_seed, unsigned long init_size, @@ -1132,7 +1213,10 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, void (*cds_lfht_rcu_read_lock)(void), void (*cds_lfht_rcu_read_unlock)(void), void (*cds_lfht_rcu_thread_offline)(void), - void (*cds_lfht_rcu_thread_online)(void)) + void (*cds_lfht_rcu_thread_online)(void), + void (*cds_lfht_rcu_register_thread)(void), + void (*cds_lfht_rcu_unregister_thread)(void), + pthread_attr_t *attr) { struct cds_lfht *ht; unsigned long order; @@ -1141,6 +1225,7 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, if (init_size && (init_size & (init_size - 1))) return NULL; ht = calloc(1, sizeof(struct cds_lfht)); + assert(ht); ht->hash_fct = hash_fct; ht->compare_fct = compare_fct; ht->hash_seed = hash_seed; @@ -1150,6 +1235,9 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock; ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline; ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online; + ht->cds_lfht_rcu_register_thread = cds_lfht_rcu_register_thread; + ht->cds_lfht_rcu_unregister_thread = cds_lfht_rcu_unregister_thread; + ht->resize_attr = attr; ht->percpu_count = alloc_per_cpu_items_count(); /* this mutex should not nest in read-side C.S. */ pthread_mutex_init(&ht->resize_mutex, NULL); @@ -1164,7 +1252,8 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, return ht; } -struct cds_lfht_node *cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key_len) +void cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key_len, + struct cds_lfht_iter *iter) { struct cds_lfht_node *node, *next, *dummy_node; struct _cds_lfht_node *lookup; @@ -1173,7 +1262,6 @@ struct cds_lfht_node *cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key hash = ht->hash_fct(key, key_len, ht->hash_seed); reverse_hash = bit_reverse_ulong(hash); -restart: size = rcu_dereference(ht->t.size); index = hash & (size - 1); order = get_count_order_ulong(index + 1); @@ -1183,16 +1271,6 @@ restart: dummy_node = (struct cds_lfht_node *) lookup; /* We can always skip the dummy node initially */ node = rcu_dereference(dummy_node->p.next); - if (unlikely(node == NULL)) { - /* - * We are executing concurrently with a hash table - * expand, so we see a dummy node with NULL next value. - * Help expand by linking this node into the list and - * retry. - */ - (void) _cds_lfht_add(ht, size >> 1, dummy_node, 0, 1); - goto restart; /* retry */ - } node = clear_flag(node); for (;;) { if (unlikely(is_end(node))) { @@ -1212,21 +1290,22 @@ restart: node = clear_flag(next); } assert(!node || !is_dummy(rcu_dereference(node->p.next))); - return node; + iter->node = node; + iter->next = next; } -struct cds_lfht_node *cds_lfht_next(struct cds_lfht *ht, - struct cds_lfht_node *node) +void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) { - struct cds_lfht_node *next; + struct cds_lfht_node *node, *next; unsigned long reverse_hash; void *key; size_t key_len; + node = iter->node; reverse_hash = node->p.reverse_hash; key = node->key; key_len = node->key_len; - next = rcu_dereference(node->p.next); + next = iter->next; node = clear_flag(next); for (;;) { @@ -1247,7 +1326,8 @@ struct cds_lfht_node *cds_lfht_next(struct cds_lfht *ht, node = clear_flag(next); } assert(!node || !is_dummy(rcu_dereference(node->p.next))); - return node; + iter->node = node; + iter->next = next; } void cds_lfht_add(struct cds_lfht *ht, struct cds_lfht_node *node) @@ -1258,12 +1338,28 @@ void cds_lfht_add(struct cds_lfht *ht, struct cds_lfht_node *node) node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash); size = rcu_dereference(ht->t.size); - (void) _cds_lfht_add(ht, size, node, 0, 0); + (void) _cds_lfht_add(ht, size, node, ADD_DEFAULT, 0); ht_count_add(ht, size); } struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht, - struct cds_lfht_node *node) + struct cds_lfht_node *node) +{ + unsigned long hash, size; + struct cds_lfht_node *ret; + + hash = ht->hash_fct(node->key, node->key_len, ht->hash_seed); + node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash); + + size = rcu_dereference(ht->t.size); + ret = _cds_lfht_add(ht, size, node, ADD_UNIQUE, 0); + if (ret == node) + ht_count_add(ht, size); + return ret; +} + +struct cds_lfht_node *cds_lfht_replace(struct cds_lfht *ht, + struct cds_lfht_node *node) { unsigned long hash, size; struct cds_lfht_node *ret; @@ -1272,21 +1368,21 @@ struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht, node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash); size = rcu_dereference(ht->t.size); - ret = _cds_lfht_add(ht, size, node, 1, 0); - if (ret != node) + ret = _cds_lfht_add(ht, size, node, ADD_REPLACE, 0); + if (ret == NULL) ht_count_add(ht, size); return ret; } -int cds_lfht_remove(struct cds_lfht *ht, struct cds_lfht_node *node) +int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node) { unsigned long size; int ret; size = rcu_dereference(ht->t.size); - ret = _cds_lfht_remove(ht, size, node, 0); + ret = _cds_lfht_del(ht, size, node, 0); if (!ret) - ht_count_remove(ht, size); + ht_count_del(ht, size); return ret; } @@ -1331,7 +1427,7 @@ int cds_lfht_delete_dummy(struct cds_lfht *ht) * Should only be called when no more concurrent readers nor writers can * possibly access the table. */ -int cds_lfht_destroy(struct cds_lfht *ht) +int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) { int ret; @@ -1343,6 +1439,8 @@ int cds_lfht_destroy(struct cds_lfht *ht) if (ret) return ret; free_per_cpu_items_count(ht->percpu_count); + if (attr) + *attr = ht->resize_attr; poison_free(ht); return ret; }