+ assert(!is_bucket(bucket));
+ assert(!is_removed(bucket));
+ assert(!is_bucket(node));
+ assert(!is_removed(node));
+ for (;;) {
+ iter_prev = bucket;
+ /* We can always skip the bucket node initially */
+ iter = rcu_dereference(iter_prev->next);
+ assert(!is_removed(iter));
+ assert(iter_prev->reverse_hash <= node->reverse_hash);
+ /*
+ * We should never be called with bucket (start of chain)
+ * and logically removed node (end of path compression
+ * marker) being the actual same node. This would be a
+ * bug in the algorithm implementation.
+ */
+ assert(bucket != node);
+ for (;;) {
+ if (caa_unlikely(is_end(iter)))
+ return;
+ if (caa_likely(clear_flag(iter)->reverse_hash > node->reverse_hash))
+ return;
+ next = rcu_dereference(clear_flag(iter)->next);
+ if (caa_likely(is_removed(next)))
+ break;
+ iter_prev = clear_flag(iter);
+ iter = next;
+ }
+ assert(!is_removed(iter));
+ if (is_bucket(iter))
+ new_next = flag_bucket(clear_flag(next));
+ else
+ new_next = clear_flag(next);
+ (void) uatomic_cmpxchg(&iter_prev->next, iter, new_next);
+ }
+}
+
+static
+int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
+ struct cds_lfht_node *old_node,
+ struct cds_lfht_node *old_next,
+ struct cds_lfht_node *new_node)
+{
+ struct cds_lfht_node *bucket, *ret_next;
+
+ if (!old_node) /* Return -ENOENT if asked to replace NULL node */
+ return -ENOENT;
+
+ assert(!is_removed(old_node));
+ assert(!is_bucket(old_node));
+ assert(!is_removed(new_node));
+ assert(!is_bucket(new_node));
+ assert(new_node != old_node);
+ for (;;) {
+ /* Insert after node to be replaced */
+ if (is_removed(old_next)) {
+ /*
+ * Too late, the old node has been removed under us
+ * between lookup and replace. Fail.
+ */
+ return -ENOENT;
+ }
+ assert(old_next == clear_flag(old_next));
+ assert(new_node != old_next);
+ /*
+ * REMOVAL_OWNER flag is _NEVER_ set before the REMOVED
+ * flag. It is either set atomically at the same time
+ * (replace) or after (del).
+ */
+ assert(!is_removal_owner(old_next));
+ new_node->next = old_next;
+ /*
+ * Here is the whole trick for lock-free replace: we add
+ * the replacement node _after_ the node we want to
+ * replace by atomically setting its next pointer at the
+ * same time we set its removal flag. Given that
+ * the lookups/get next use an iterator aware of the
+ * next pointer, they will either skip the old node due
+ * to the removal flag and see the new node, or use
+ * the old node, but will not see the new one.
+ * This is a replacement of a node with another node
+ * that has the same value: we are therefore not
+ * removing a value from the hash table. We set both the
+ * REMOVED and REMOVAL_OWNER flags atomically so we own
+ * the node after successful cmpxchg.
+ */
+ ret_next = uatomic_cmpxchg(&old_node->next,
+ old_next, flag_removed_or_removal_owner(new_node));
+ if (ret_next == old_next)
+ break; /* We performed the replacement. */
+ old_next = ret_next;
+ }
+
+ /*
+ * Ensure that the old node is not visible to readers anymore:
+ * lookup for the node, and remove it (along with any other
+ * logically removed node) if found.
+ */
+ bucket = lookup_bucket(ht, size, bit_reverse_ulong(old_node->reverse_hash));
+ _cds_lfht_gc_bucket(bucket, new_node);
+
+ assert(is_removed(CMM_LOAD_SHARED(old_node->next)));
+ return 0;
+}
+
+/*
+ * A non-NULL unique_ret pointer uses the "add unique" (or uniquify) add
+ * mode. A NULL unique_ret allows creation of duplicate keys.
+ */
+static
+void _cds_lfht_add(struct cds_lfht *ht,
+ unsigned long hash,
+ cds_lfht_match_fct match,
+ const void *key,
+ unsigned long size,
+ struct cds_lfht_node *node,
+ struct cds_lfht_iter *unique_ret,
+ int bucket_flag)
+{
+ struct cds_lfht_node *iter_prev, *iter, *next, *new_node, *new_next,
+ *return_node;
+ struct cds_lfht_node *bucket;
+
+ assert(!is_bucket(node));
+ assert(!is_removed(node));
+ bucket = lookup_bucket(ht, size, hash);
+ for (;;) {
+ uint32_t chain_len = 0;
+
+ /*
+ * iter_prev points to the non-removed node prior to the
+ * insert location.
+ */
+ iter_prev = bucket;
+ /* We can always skip the bucket node initially */
+ iter = rcu_dereference(iter_prev->next);
+ assert(iter_prev->reverse_hash <= node->reverse_hash);
+ for (;;) {
+ if (caa_unlikely(is_end(iter)))
+ goto insert;
+ if (caa_likely(clear_flag(iter)->reverse_hash > node->reverse_hash))
+ goto insert;
+
+ /* bucket node is the first node of the identical-hash-value chain */
+ if (bucket_flag && clear_flag(iter)->reverse_hash == node->reverse_hash)
+ goto insert;
+
+ next = rcu_dereference(clear_flag(iter)->next);
+ if (caa_unlikely(is_removed(next)))
+ goto gc_node;
+
+ /* uniquely add */
+ if (unique_ret
+ && !is_bucket(next)
+ && clear_flag(iter)->reverse_hash == node->reverse_hash) {
+ struct cds_lfht_iter d_iter = { .node = node, .next = iter, };
+
+ /*
+ * uniquely adding inserts the node as the first
+ * node of the identical-hash-value node chain.
+ *
+ * This semantic ensures no duplicated keys
+ * should ever be observable in the table
+ * (including traversing the table node by
+ * node by forward iterations)
+ */
+ cds_lfht_next_duplicate(ht, match, key, &d_iter);
+ if (!d_iter.node)
+ goto insert;
+
+ *unique_ret = d_iter;
+ return;
+ }
+
+ /* Only account for identical reverse hash once */
+ if (iter_prev->reverse_hash != clear_flag(iter)->reverse_hash
+ && !is_bucket(next))
+ check_resize(ht, size, ++chain_len);
+ iter_prev = clear_flag(iter);
+ iter = next;
+ }
+
+ insert:
+ assert(node != clear_flag(iter));
+ assert(!is_removed(iter_prev));
+ assert(!is_removed(iter));
+ assert(iter_prev != node);
+ if (!bucket_flag)
+ node->next = clear_flag(iter);
+ else
+ node->next = flag_bucket(clear_flag(iter));
+ if (is_bucket(iter))
+ new_node = flag_bucket(node);
+ else
+ new_node = node;
+ if (uatomic_cmpxchg(&iter_prev->next, iter,
+ new_node) != iter) {
+ continue; /* retry */
+ } else {
+ return_node = node;
+ goto end;
+ }
+
+ gc_node:
+ assert(!is_removed(iter));
+ if (is_bucket(iter))
+ new_next = flag_bucket(clear_flag(next));
+ else
+ new_next = clear_flag(next);
+ (void) uatomic_cmpxchg(&iter_prev->next, iter, new_next);
+ /* retry */
+ }
+end:
+ if (unique_ret) {
+ unique_ret->node = return_node;
+ /* unique_ret->next left unset, never used. */
+ }
+}
+
+static
+int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
+ struct cds_lfht_node *node)
+{
+ struct cds_lfht_node *bucket, *next;
+
+ if (!node) /* Return -ENOENT if asked to delete NULL node */
+ return -ENOENT;
+
+ /* logically delete the node */
+ assert(!is_bucket(node));
+ assert(!is_removed(node));
+ assert(!is_removal_owner(node));
+
+ /*
+ * We are first checking if the node had previously been
+ * logically removed (this check is not atomic with setting the
+ * logical removal flag). Return -ENOENT if the node had
+ * previously been removed.
+ */
+ next = CMM_LOAD_SHARED(node->next); /* next is not dereferenced */
+ if (caa_unlikely(is_removed(next)))
+ return -ENOENT;
+ assert(!is_bucket(next));
+ /*
+ * The del operation semantic guarantees a full memory barrier
+ * before the uatomic_or atomic commit of the deletion flag.
+ */
+ cmm_smp_mb__before_uatomic_or();
+ /*
+ * We set the REMOVED_FLAG unconditionally. Note that there may
+ * be more than one concurrent thread setting this flag.
+ * Knowing which wins the race will be known after the garbage
+ * collection phase, stay tuned!
+ */
+ uatomic_or(&node->next, REMOVED_FLAG);
+ /* We performed the (logical) deletion. */
+
+ /*
+ * Ensure that the node is not visible to readers anymore: lookup for
+ * the node, and remove it (along with any other logically removed node)
+ * if found.
+ */
+ bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash));
+ _cds_lfht_gc_bucket(bucket, node);
+
+ assert(is_removed(CMM_LOAD_SHARED(node->next)));
+ /*
+ * Last phase: atomically exchange node->next with a version
+ * having "REMOVAL_OWNER_FLAG" set. If the returned node->next
+ * pointer did _not_ have "REMOVAL_OWNER_FLAG" set, we now own
+ * the node and win the removal race.
+ * It is interesting to note that all "add" paths are forbidden
+ * to change the next pointer starting from the point where the
+ * REMOVED_FLAG is set, so here using a read, followed by a
+ * xchg() suffice to guarantee that the xchg() will ever only
+ * set the "REMOVAL_OWNER_FLAG" (or change nothing if the flag
+ * was already set).
+ */
+ if (!is_removal_owner(uatomic_xchg(&node->next,
+ flag_removal_owner(node->next))))
+ return 0;
+ else
+ return -ENOENT;
+}
+
+static
+void *partition_resize_thread(void *arg)
+{
+ struct partition_resize_work *work = arg;
+
+ work->ht->flavor->register_thread();
+ work->fct(work->ht, work->i, work->start, work->len);
+ work->ht->flavor->unregister_thread();
+ return NULL;
+}
+
+static
+void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
+ unsigned long len,
+ void (*fct)(struct cds_lfht *ht, unsigned long i,
+ unsigned long start, unsigned long len))
+{
+ unsigned long partition_len;
+ struct partition_resize_work *work;
+ int thread, ret;
+ unsigned long nr_threads;
+
+ /*
+ * Note: nr_cpus_mask + 1 is always power of 2.
+ * We spawn just the number of threads we need to satisfy the minimum
+ * partition size, up to the number of CPUs in the system.
+ */
+ if (nr_cpus_mask > 0) {
+ nr_threads = min(nr_cpus_mask + 1,
+ len >> MIN_PARTITION_PER_THREAD_ORDER);
+ } else {
+ nr_threads = 1;
+ }
+ partition_len = len >> cds_lfht_get_count_order_ulong(nr_threads);
+ work = calloc(nr_threads, sizeof(*work));
+ assert(work);
+ for (thread = 0; thread < nr_threads; thread++) {
+ work[thread].ht = ht;
+ work[thread].i = i;
+ work[thread].len = partition_len;
+ work[thread].start = thread * partition_len;
+ work[thread].fct = fct;
+ ret = pthread_create(&(work[thread].thread_id), ht->resize_attr,
+ partition_resize_thread, &work[thread]);
+ assert(!ret);
+ }
+ for (thread = 0; thread < nr_threads; thread++) {
+ ret = pthread_join(work[thread].thread_id, NULL);
+ assert(!ret);
+ }
+ free(work);
+}
+
+/*
+ * Holding RCU read lock to protect _cds_lfht_add against memory
+ * reclaim that could be performed by other call_rcu worker threads (ABA
+ * problem).
+ *
+ * When we reach a certain length, we can split this population phase over
+ * many worker threads, based on the number of CPUs available in the system.
+ * This should therefore take care of not having the expand lagging behind too
+ * many concurrent insertion threads by using the scheduler's ability to
+ * schedule bucket node population fairly with insertions.
+ */
+static
+void init_table_populate_partition(struct cds_lfht *ht, unsigned long i,
+ unsigned long start, unsigned long len)
+{
+ unsigned long j, size = 1UL << (i - 1);
+
+ assert(i > MIN_TABLE_ORDER);
+ ht->flavor->read_lock();
+ for (j = size + start; j < size + start + len; j++) {
+ struct cds_lfht_node *new_node = bucket_at(ht, j);
+
+ assert(j >= size && j < (size << 1));
+ dbg_printf("init populate: order %lu index %lu hash %lu\n",
+ i, j, j);
+ new_node->reverse_hash = bit_reverse_ulong(j);
+ _cds_lfht_add(ht, j, NULL, NULL, size, new_node, NULL, 1);
+ }
+ ht->flavor->read_unlock();
+}
+
+static
+void init_table_populate(struct cds_lfht *ht, unsigned long i,
+ unsigned long len)
+{
+ assert(nr_cpus_mask != -1);
+ if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) {
+ ht->flavor->thread_online();
+ init_table_populate_partition(ht, i, 0, len);
+ ht->flavor->thread_offline();
+ return;
+ }
+ partition_resize_helper(ht, i, len, init_table_populate_partition);
+}
+
+static
+void init_table(struct cds_lfht *ht,
+ unsigned long first_order, unsigned long last_order)
+{
+ unsigned long i;
+
+ dbg_printf("init table: first_order %lu last_order %lu\n",
+ first_order, last_order);
+ assert(first_order > MIN_TABLE_ORDER);
+ for (i = first_order; i <= last_order; i++) {
+ unsigned long len;
+
+ len = 1UL << (i - 1);
+ dbg_printf("init order %lu len: %lu\n", i, len);
+
+ /* Stop expand if the resize target changes under us */
+ if (CMM_LOAD_SHARED(ht->resize_target) < (1UL << i))
+ break;
+
+ cds_lfht_alloc_bucket_table(ht, i);
+
+ /*
+ * Set all bucket nodes reverse hash values for a level and
+ * link all bucket nodes into the table.
+ */
+ init_table_populate(ht, i, len);
+
+ /*
+ * Update table size.
+ */
+ cmm_smp_wmb(); /* populate data before RCU size */
+ CMM_STORE_SHARED(ht->size, 1UL << i);
+
+ dbg_printf("init new size: %lu\n", 1UL << i);
+ if (CMM_LOAD_SHARED(ht->in_progress_destroy))
+ break;
+ }
+}
+
+/*
+ * Holding RCU read lock to protect _cds_lfht_remove against memory
+ * reclaim that could be performed by other call_rcu worker threads (ABA
+ * problem).
+ * For a single level, we logically remove and garbage collect each node.
+ *
+ * As a design choice, we perform logical removal and garbage collection on a
+ * node-per-node basis to simplify this algorithm. We also assume keeping good
+ * cache locality of the operation would overweight possible performance gain
+ * that could be achieved by batching garbage collection for multiple levels.
+ * However, this would have to be justified by benchmarks.
+ *
+ * Concurrent removal and add operations are helping us perform garbage
+ * collection of logically removed nodes. We guarantee that all logically
+ * removed nodes have been garbage-collected (unlinked) before call_rcu is
+ * invoked to free a hole level of bucket nodes (after a grace period).
+ *
+ * Logical removal and garbage collection can therefore be done in batch
+ * or on a node-per-node basis, as long as the guarantee above holds.
+ *
+ * When we reach a certain length, we can split this removal over many worker
+ * threads, based on the number of CPUs available in the system. This should
+ * take care of not letting resize process lag behind too many concurrent
+ * updater threads actively inserting into the hash table.
+ */
+static
+void remove_table_partition(struct cds_lfht *ht, unsigned long i,
+ unsigned long start, unsigned long len)
+{
+ unsigned long j, size = 1UL << (i - 1);
+
+ assert(i > MIN_TABLE_ORDER);
+ ht->flavor->read_lock();
+ for (j = size + start; j < size + start + len; j++) {
+ struct cds_lfht_node *fini_bucket = bucket_at(ht, j);
+ struct cds_lfht_node *parent_bucket = bucket_at(ht, j - size);
+
+ assert(j >= size && j < (size << 1));
+ dbg_printf("remove entry: order %lu index %lu hash %lu\n",
+ i, j, j);
+ /* Set the REMOVED_FLAG to freeze the ->next for gc */
+ uatomic_or(&fini_bucket->next, REMOVED_FLAG);
+ _cds_lfht_gc_bucket(parent_bucket, fini_bucket);
+ }
+ ht->flavor->read_unlock();
+}
+
+static
+void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len)
+{
+
+ assert(nr_cpus_mask != -1);
+ if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) {
+ ht->flavor->thread_online();
+ remove_table_partition(ht, i, 0, len);
+ ht->flavor->thread_offline();
+ return;
+ }
+ partition_resize_helper(ht, i, len, remove_table_partition);
+}
+
+/*
+ * fini_table() is never called for first_order == 0, which is why
+ * free_by_rcu_order == 0 can be used as criterion to know if free must
+ * be called.
+ */
+static
+void fini_table(struct cds_lfht *ht,
+ unsigned long first_order, unsigned long last_order)
+{
+ long i;
+ unsigned long free_by_rcu_order = 0;
+
+ dbg_printf("fini table: first_order %lu last_order %lu\n",
+ first_order, last_order);
+ assert(first_order > MIN_TABLE_ORDER);
+ for (i = last_order; i >= first_order; i--) {
+ unsigned long len;
+
+ len = 1UL << (i - 1);
+ dbg_printf("fini order %lu len: %lu\n", i, len);
+
+ /* Stop shrink if the resize target changes under us */
+ if (CMM_LOAD_SHARED(ht->resize_target) > (1UL << (i - 1)))
+ break;
+
+ cmm_smp_wmb(); /* populate data before RCU size */
+ CMM_STORE_SHARED(ht->size, 1UL << (i - 1));
+
+ /*
+ * We need to wait for all add operations to reach Q.S. (and
+ * thus use the new table for lookups) before we can start
+ * releasing the old bucket nodes. Otherwise their lookup will
+ * return a logically removed node as insert position.
+ */
+ ht->flavor->update_synchronize_rcu();
+ if (free_by_rcu_order)
+ cds_lfht_free_bucket_table(ht, free_by_rcu_order);
+
+ /*
+ * Set "removed" flag in bucket nodes about to be removed.
+ * Unlink all now-logically-removed bucket node pointers.
+ * Concurrent add/remove operation are helping us doing
+ * the gc.
+ */
+ remove_table(ht, i, len);
+
+ free_by_rcu_order = i;
+
+ dbg_printf("fini new size: %lu\n", 1UL << i);
+ if (CMM_LOAD_SHARED(ht->in_progress_destroy))
+ break;
+ }
+
+ if (free_by_rcu_order) {
+ ht->flavor->update_synchronize_rcu();
+ cds_lfht_free_bucket_table(ht, free_by_rcu_order);
+ }
+}
+
+static
+void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size)
+{
+ struct cds_lfht_node *prev, *node;
+ unsigned long order, len, i;
+
+ cds_lfht_alloc_bucket_table(ht, 0);
+
+ dbg_printf("create bucket: order 0 index 0 hash 0\n");
+ node = bucket_at(ht, 0);
+ node->next = flag_bucket(get_end());
+ node->reverse_hash = 0;
+
+ for (order = 1; order < cds_lfht_get_count_order_ulong(size) + 1; order++) {
+ len = 1UL << (order - 1);
+ cds_lfht_alloc_bucket_table(ht, order);
+
+ for (i = 0; i < len; i++) {
+ /*
+ * Now, we are trying to init the node with the
+ * hash=(len+i) (which is also a bucket with the
+ * index=(len+i)) and insert it into the hash table,
+ * so this node has to be inserted after the bucket
+ * with the index=(len+i)&(len-1)=i. And because there
+ * is no other non-bucket node nor bucket node with
+ * larger index/hash inserted, so the bucket node
+ * being inserted should be inserted directly linked
+ * after the bucket node with index=i.
+ */
+ prev = bucket_at(ht, i);
+ node = bucket_at(ht, len + i);
+
+ dbg_printf("create bucket: order %lu index %lu hash %lu\n",
+ order, len + i, len + i);
+ node->reverse_hash = bit_reverse_ulong(len + i);
+
+ /* insert after prev */
+ assert(is_bucket(prev->next));
+ node->next = prev->next;
+ prev->next = flag_bucket(node);
+ }
+ }
+}
+
+struct cds_lfht *_cds_lfht_new(unsigned long init_size,
+ unsigned long min_nr_alloc_buckets,
+ unsigned long max_nr_buckets,
+ int flags,
+ const struct cds_lfht_mm_type *mm,
+ const struct rcu_flavor_struct *flavor,
+ pthread_attr_t *attr)
+{
+ struct cds_lfht *ht;
+ unsigned long order;
+
+ /* min_nr_alloc_buckets must be power of two */
+ if (!min_nr_alloc_buckets || (min_nr_alloc_buckets & (min_nr_alloc_buckets - 1)))
+ return NULL;
+
+ /* init_size must be power of two */
+ if (!init_size || (init_size & (init_size - 1)))
+ return NULL;
+
+ /*
+ * Memory management plugin default.
+ */
+ if (!mm) {
+ if (CAA_BITS_PER_LONG > 32
+ && max_nr_buckets
+ && max_nr_buckets <= (1ULL << 32)) {
+ /*
+ * For 64-bit architectures, with max number of
+ * buckets small enough not to use the entire
+ * 64-bit memory mapping space (and allowing a
+ * fair number of hash table instances), use the
+ * mmap allocator, which is faster than the
+ * order allocator.
+ */
+ mm = &cds_lfht_mm_mmap;
+ } else {
+ /*
+ * The fallback is to use the order allocator.
+ */
+ mm = &cds_lfht_mm_order;
+ }
+ }
+
+ /* max_nr_buckets == 0 for order based mm means infinite */
+ if (mm == &cds_lfht_mm_order && !max_nr_buckets)
+ max_nr_buckets = 1UL << (MAX_TABLE_ORDER - 1);
+
+ /* max_nr_buckets must be power of two */
+ if (!max_nr_buckets || (max_nr_buckets & (max_nr_buckets - 1)))
+ return NULL;
+
+ min_nr_alloc_buckets = max(min_nr_alloc_buckets, MIN_TABLE_SIZE);
+ init_size = max(init_size, MIN_TABLE_SIZE);
+ max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets);
+ init_size = min(init_size, max_nr_buckets);
+
+ ht = mm->alloc_cds_lfht(min_nr_alloc_buckets, max_nr_buckets);
+ assert(ht);
+ assert(ht->mm == mm);
+ assert(ht->bucket_at == mm->bucket_at);
+
+ ht->flags = flags;
+ ht->flavor = flavor;
+ ht->resize_attr = attr;
+ alloc_split_items_count(ht);
+ /* this mutex should not nest in read-side C.S. */
+ pthread_mutex_init(&ht->resize_mutex, NULL);
+ order = cds_lfht_get_count_order_ulong(init_size);
+ ht->resize_target = 1UL << order;
+ cds_lfht_create_bucket(ht, 1UL << order);
+ ht->size = 1UL << order;
+ return ht;
+}
+
+void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash,
+ cds_lfht_match_fct match, const void *key,
+ struct cds_lfht_iter *iter)
+{
+ struct cds_lfht_node *node, *next, *bucket;
+ unsigned long reverse_hash, size;
+
+ reverse_hash = bit_reverse_ulong(hash);
+
+ size = rcu_dereference(ht->size);
+ bucket = lookup_bucket(ht, size, hash);
+ /* We can always skip the bucket node initially */
+ node = rcu_dereference(bucket->next);
+ node = clear_flag(node);
+ for (;;) {
+ if (caa_unlikely(is_end(node))) {
+ node = next = NULL;
+ break;
+ }
+ if (caa_unlikely(node->reverse_hash > reverse_hash)) {
+ node = next = NULL;
+ break;
+ }
+ next = rcu_dereference(node->next);
+ assert(node == clear_flag(node));
+ if (caa_likely(!is_removed(next))
+ && !is_bucket(next)
+ && node->reverse_hash == reverse_hash
+ && caa_likely(match(node, key))) {
+ break;
+ }
+ node = clear_flag(next);
+ }
+ assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
+ iter->node = node;
+ iter->next = next;
+}
+
+void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match,
+ const void *key, struct cds_lfht_iter *iter)
+{
+ struct cds_lfht_node *node, *next;
+ unsigned long reverse_hash;
+
+ node = iter->node;
+ reverse_hash = node->reverse_hash;
+ next = iter->next;
+ node = clear_flag(next);
+
+ for (;;) {
+ if (caa_unlikely(is_end(node))) {
+ node = next = NULL;
+ break;
+ }
+ if (caa_unlikely(node->reverse_hash > reverse_hash)) {
+ node = next = NULL;
+ break;
+ }
+ next = rcu_dereference(node->next);
+ if (caa_likely(!is_removed(next))
+ && !is_bucket(next)
+ && caa_likely(match(node, key))) {
+ break;
+ }
+ node = clear_flag(next);
+ }
+ assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
+ iter->node = node;
+ iter->next = next;
+}
+
+void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+{
+ struct cds_lfht_node *node, *next;
+
+ node = clear_flag(iter->next);
+ for (;;) {
+ if (caa_unlikely(is_end(node))) {
+ node = next = NULL;
+ break;
+ }
+ next = rcu_dereference(node->next);
+ if (caa_likely(!is_removed(next))
+ && !is_bucket(next)) {
+ break;
+ }
+ node = clear_flag(next);
+ }
+ assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
+ iter->node = node;
+ iter->next = next;
+}
+
+void cds_lfht_first(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+{
+ /*
+ * Get next after first bucket node. The first bucket node is the
+ * first node of the linked list.
+ */
+ iter->next = bucket_at(ht, 0)->next;
+ cds_lfht_next(ht, iter);
+}
+
+void cds_lfht_add(struct cds_lfht *ht, unsigned long hash,
+ struct cds_lfht_node *node)
+{
+ unsigned long size;
+
+ node->reverse_hash = bit_reverse_ulong(hash);
+ size = rcu_dereference(ht->size);
+ _cds_lfht_add(ht, hash, NULL, NULL, size, node, NULL, 0);
+ ht_count_add(ht, size, hash);
+}
+
+struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht,
+ unsigned long hash,
+ cds_lfht_match_fct match,
+ const void *key,
+ struct cds_lfht_node *node)
+{
+ unsigned long size;
+ struct cds_lfht_iter iter;
+
+ node->reverse_hash = bit_reverse_ulong(hash);
+ size = rcu_dereference(ht->size);
+ _cds_lfht_add(ht, hash, match, key, size, node, &iter, 0);
+ if (iter.node == node)
+ ht_count_add(ht, size, hash);
+ return iter.node;
+}
+
+struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht,
+ unsigned long hash,
+ cds_lfht_match_fct match,
+ const void *key,
+ struct cds_lfht_node *node)
+{
+ unsigned long size;
+ struct cds_lfht_iter iter;
+
+ node->reverse_hash = bit_reverse_ulong(hash);
+ size = rcu_dereference(ht->size);
+ for (;;) {
+ _cds_lfht_add(ht, hash, match, key, size, node, &iter, 0);
+ if (iter.node == node) {
+ ht_count_add(ht, size, hash);
+ return NULL;
+ }
+
+ if (!_cds_lfht_replace(ht, size, iter.node, iter.next, node))
+ return iter.node;
+ }
+}
+
+int cds_lfht_replace(struct cds_lfht *ht,
+ struct cds_lfht_iter *old_iter,
+ unsigned long hash,
+ cds_lfht_match_fct match,
+ const void *key,
+ struct cds_lfht_node *new_node)
+{
+ unsigned long size;
+
+ new_node->reverse_hash = bit_reverse_ulong(hash);
+ if (!old_iter->node)
+ return -ENOENT;
+ if (caa_unlikely(old_iter->node->reverse_hash != new_node->reverse_hash))
+ return -EINVAL;
+ if (caa_unlikely(!match(old_iter->node, key)))
+ return -EINVAL;
+ size = rcu_dereference(ht->size);
+ return _cds_lfht_replace(ht, size, old_iter->node, old_iter->next,
+ new_node);
+}
+
+int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node)
+{
+ unsigned long size;
+ int ret;
+
+ size = rcu_dereference(ht->size);
+ ret = _cds_lfht_del(ht, size, node);
+ if (!ret) {
+ unsigned long hash;
+
+ hash = bit_reverse_ulong(node->reverse_hash);
+ ht_count_del(ht, size, hash);
+ }
+ return ret;
+}
+
+int cds_lfht_is_node_deleted(struct cds_lfht_node *node)
+{
+ return is_removed(CMM_LOAD_SHARED(node->next));
+}
+
+static
+int cds_lfht_delete_bucket(struct cds_lfht *ht)
+{
+ struct cds_lfht_node *node;
+ unsigned long order, i, size;
+
+ /* Check that the table is empty */
+ node = bucket_at(ht, 0);
+ do {
+ node = clear_flag(node)->next;
+ if (!is_bucket(node))
+ return -EPERM;
+ assert(!is_removed(node));
+ } while (!is_end(node));
+ /*
+ * size accessed without rcu_dereference because hash table is
+ * being destroyed.
+ */
+ size = ht->size;
+ /* Internal sanity check: all nodes left should be buckets */
+ for (i = 0; i < size; i++) {
+ node = bucket_at(ht, i);
+ dbg_printf("delete bucket: index %lu expected hash %lu hash %lu\n",
+ i, i, bit_reverse_ulong(node->reverse_hash));
+ assert(is_bucket(node->next));
+ }
+
+ for (order = cds_lfht_get_count_order_ulong(size); (long)order >= 0; order--)
+ cds_lfht_free_bucket_table(ht, order);
+
+ return 0;
+}
+
+/*
+ * Should only be called when no more concurrent readers nor writers can
+ * possibly access the table.
+ */
+int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
+{
+ int ret;
+
+ /* Wait for in-flight resize operations to complete */
+ _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
+ cmm_smp_mb(); /* Store destroy before load resize */
+ ht->flavor->thread_offline();
+ while (uatomic_read(&ht->in_progress_resize))
+ poll(NULL, 0, 100); /* wait for 100ms */
+ ht->flavor->thread_online();
+ ret = cds_lfht_delete_bucket(ht);
+ if (ret)
+ return ret;
+ free_split_items_count(ht);
+ if (attr)
+ *attr = ht->resize_attr;
+ poison_free(ht);
+ return ret;
+}
+
+void cds_lfht_count_nodes(struct cds_lfht *ht,
+ long *approx_before,
+ unsigned long *count,
+ long *approx_after)
+{
+ struct cds_lfht_node *node, *next;
+ unsigned long nr_bucket = 0, nr_removed = 0;
+
+ *approx_before = 0;
+ if (ht->split_count) {
+ int i;
+
+ for (i = 0; i < split_count_mask + 1; i++) {
+ *approx_before += uatomic_read(&ht->split_count[i].add);
+ *approx_before -= uatomic_read(&ht->split_count[i].del);
+ }
+ }
+
+ *count = 0;
+
+ /* Count non-bucket nodes in the table */
+ node = bucket_at(ht, 0);
+ do {
+ next = rcu_dereference(node->next);
+ if (is_removed(next)) {
+ if (!is_bucket(next))
+ (nr_removed)++;
+ else
+ (nr_bucket)++;
+ } else if (!is_bucket(next))
+ (*count)++;
+ else
+ (nr_bucket)++;
+ node = clear_flag(next);
+ } while (!is_end(node));
+ dbg_printf("number of logically removed nodes: %lu\n", nr_removed);
+ dbg_printf("number of bucket nodes: %lu\n", nr_bucket);
+ *approx_after = 0;
+ if (ht->split_count) {
+ int i;
+
+ for (i = 0; i < split_count_mask + 1; i++) {
+ *approx_after += uatomic_read(&ht->split_count[i].add);
+ *approx_after -= uatomic_read(&ht->split_count[i].del);
+ }
+ }
+}
+
+/* called with resize mutex held */
+static
+void _do_cds_lfht_grow(struct cds_lfht *ht,
+ unsigned long old_size, unsigned long new_size)
+{
+ unsigned long old_order, new_order;
+
+ old_order = cds_lfht_get_count_order_ulong(old_size);
+ new_order = cds_lfht_get_count_order_ulong(new_size);
+ dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n",
+ old_size, old_order, new_size, new_order);
+ assert(new_size > old_size);
+ init_table(ht, old_order + 1, new_order);
+}
+
+/* called with resize mutex held */
+static
+void _do_cds_lfht_shrink(struct cds_lfht *ht,
+ unsigned long old_size, unsigned long new_size)
+{
+ unsigned long old_order, new_order;
+
+ new_size = max(new_size, MIN_TABLE_SIZE);
+ old_order = cds_lfht_get_count_order_ulong(old_size);
+ new_order = cds_lfht_get_count_order_ulong(new_size);
+ dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n",
+ old_size, old_order, new_size, new_order);
+ assert(new_size < old_size);
+
+ /* Remove and unlink all bucket nodes to remove. */
+ fini_table(ht, new_order + 1, old_order);
+}
+
+
+/* called with resize mutex held */
+static
+void _do_cds_lfht_resize(struct cds_lfht *ht)
+{
+ unsigned long new_size, old_size;
+
+ /*
+ * Resize table, re-do if the target size has changed under us.
+ */
+ do {
+ assert(uatomic_read(&ht->in_progress_resize));
+ if (CMM_LOAD_SHARED(ht->in_progress_destroy))
+ break;
+ ht->resize_initiated = 1;
+ old_size = ht->size;
+ new_size = CMM_LOAD_SHARED(ht->resize_target);
+ if (old_size < new_size)
+ _do_cds_lfht_grow(ht, old_size, new_size);
+ else if (old_size > new_size)
+ _do_cds_lfht_shrink(ht, old_size, new_size);
+ ht->resize_initiated = 0;
+ /* write resize_initiated before read resize_target */
+ cmm_smp_mb();
+ } while (ht->size != CMM_LOAD_SHARED(ht->resize_target));
+}
+
+static
+unsigned long resize_target_grow(struct cds_lfht *ht, unsigned long new_size)
+{
+ return _uatomic_xchg_monotonic_increase(&ht->resize_target, new_size);
+}
+
+static
+void resize_target_update_count(struct cds_lfht *ht,
+ unsigned long count)
+{
+ count = max(count, MIN_TABLE_SIZE);
+ count = min(count, ht->max_nr_buckets);
+ uatomic_set(&ht->resize_target, count);
+}
+
+void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
+{
+ resize_target_update_count(ht, new_size);
+ CMM_STORE_SHARED(ht->resize_initiated, 1);
+ ht->flavor->thread_offline();
+ pthread_mutex_lock(&ht->resize_mutex);
+ _do_cds_lfht_resize(ht);
+ pthread_mutex_unlock(&ht->resize_mutex);
+ ht->flavor->thread_online();
+}
+
+static
+void do_resize_cb(struct rcu_head *head)
+{
+ struct rcu_resize_work *work =
+ caa_container_of(head, struct rcu_resize_work, head);
+ struct cds_lfht *ht = work->ht;
+
+ ht->flavor->thread_offline();
+ pthread_mutex_lock(&ht->resize_mutex);
+ _do_cds_lfht_resize(ht);
+ pthread_mutex_unlock(&ht->resize_mutex);
+ ht->flavor->thread_online();
+ poison_free(work);
+ cmm_smp_mb(); /* finish resize before decrement */
+ uatomic_dec(&ht->in_progress_resize);
+}
+
+static
+void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht)
+{
+ struct rcu_resize_work *work;
+
+ /* Store resize_target before read resize_initiated */
+ cmm_smp_mb();
+ if (!CMM_LOAD_SHARED(ht->resize_initiated)) {
+ uatomic_inc(&ht->in_progress_resize);
+ cmm_smp_mb(); /* increment resize count before load destroy */
+ if (CMM_LOAD_SHARED(ht->in_progress_destroy)) {
+ uatomic_dec(&ht->in_progress_resize);
+ return;
+ }
+ work = malloc(sizeof(*work));
+ if (work == NULL) {
+ dbg_printf("error allocating resize work, bailing out\n");
+ uatomic_dec(&ht->in_progress_resize);
+ return;
+ }
+ work->ht = ht;
+ ht->flavor->update_call_rcu(&work->head, do_resize_cb);
+ CMM_STORE_SHARED(ht->resize_initiated, 1);
+ }
+}
+
+static
+void cds_lfht_resize_lazy_grow(struct cds_lfht *ht, unsigned long size, int growth)
+{
+ unsigned long target_size = size << growth;
+
+ target_size = min(target_size, ht->max_nr_buckets);
+ if (resize_target_grow(ht, target_size) >= target_size)
+ return;
+
+ __cds_lfht_resize_lazy_launch(ht);
+}
+
+/*
+ * We favor grow operations over shrink. A shrink operation never occurs
+ * if a grow operation is queued for lazy execution. A grow operation
+ * cancels any pending shrink lazy execution.
+ */
+static
+void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
+ unsigned long count)
+{
+ if (!(ht->flags & CDS_LFHT_AUTO_RESIZE))
+ return;
+ count = max(count, MIN_TABLE_SIZE);
+ count = min(count, ht->max_nr_buckets);
+ if (count == size)
+ return; /* Already the right size, no resize needed */
+ if (count > size) { /* lazy grow */
+ if (resize_target_grow(ht, count) >= count)
+ return;
+ } else { /* lazy shrink */
+ for (;;) {
+ unsigned long s;
+
+ s = uatomic_cmpxchg(&ht->resize_target, size, count);
+ if (s == size)
+ break; /* no resize needed */
+ if (s > size)
+ return; /* growing is/(was just) in progress */
+ if (s <= count)
+ return; /* some other thread do shrink */
+ size = s;
+ }