Fix: hash table growth (for small tables) should be limited
[urcu.git] / rculfhash.c
index 6c648f17da8a939d22894067eb1cc8c24620a8e0..7d39388f494700a1b06a5df885d5a5350cdf42a8 100644 (file)
  * implementation:
  *
  * - RCU read-side critical section allows readers to perform hash
- *   table lookups and use the returned objects safely by delaying
- *   memory reclaim of a grace period.
+ *   table lookups, as well as traversals, and use the returned objects
+ *   safely by allowing memory reclaim to take place only after a grace
+ *   period.
  * - Add and remove operations are lock-free, and do not need to
  *   allocate memory. They need to be executed within RCU read-side
  *   critical section to ensure the objects they read are valid and to
  *   deal with the cmpxchg ABA problem.
  * - add and add_unique operations are supported. add_unique checks if
- *   the node key already exists in the hash table. It ensures no key
- *   duplicata exists.
- * - The resize operation executes concurrently with add/remove/lookup.
+ *   the node key already exists in the hash table. It ensures not to
+ *   populate a duplicate key if the node key already exists in the hash
+ *   table.
+ * - The resize operation executes concurrently with
+ *   add/add_unique/add_replace/remove/lookup/traversal.
  * - Hash table nodes are contained within a split-ordered list. This
  *   list is ordered by incrementing reversed-bits-hash value.
  * - An index of bucket nodes is kept. These bucket nodes are the hash
- *   table "buckets", and they are also chained together in the
- *   split-ordered list, which allows recursive expansion.
- * - The resize operation for small tables only allows expanding the hash table.
- *   It is triggered automatically by detecting long chains in the add
- *   operation.
+ *   table "buckets". These buckets are internal nodes that allow to
+ *   perform a fast hash lookup, similarly to a skip list. These
+ *   buckets are chained together in the split-ordered list, which
+ *   allows recursive expansion by inserting new buckets between the
+ *   existing buckets. The split-ordered list allows adding new buckets
+ *   between existing buckets as the table needs to grow.
+ * - The resize operation for small tables only allows expanding the
+ *   hash table. It is triggered automatically by detecting long chains
+ *   in the add operation.
  * - The resize operation for larger tables (and available through an
  *   API) allows both expanding and shrinking the hash table.
  * - Split-counters are used to keep track of the number of
  *   (not visible to lookups anymore) before the RCU read-side critical
  *   section held across removal ends. Furthermore, this ensures that
  *   the node with "removed" flag set is removed from the linked-list
- *   before its memory is reclaimed. Only the thread which removal
- *   successfully set the "removed" flag (with a cmpxchg) into a node's
- *   next pointer is considered to have succeeded its removal (and thus
- *   owns the node to reclaim). Because we garbage-collect starting from
- *   an invariant node (the start-of-bucket bucket node) up to the
- *   "removed" node (or find a reverse-hash that is higher), we are sure
- *   that a successful traversal of the chain leads to a chain that is
- *   present in the linked-list (the start node is never removed) and
- *   that is does not contain the "removed" node anymore, even if
- *   concurrent delete/add operations are changing the structure of the
- *   list concurrently.
- * - The add operation performs gargage collection of buckets if it
- *   encounters nodes with removed flag set in the bucket where it wants
- *   to add its new node. This ensures lock-freedom of add operation by
+ *   before its memory is reclaimed. After setting the "removal" flag,
+ *   only the thread which removal is the first to set the "removal
+ *   owner" flag (with an xchg) into a node's next pointer is considered
+ *   to have succeeded its removal (and thus owns the node to reclaim).
+ *   Because we garbage-collect starting from an invariant node (the
+ *   start-of-bucket bucket node) up to the "removed" node (or find a
+ *   reverse-hash that is higher), we are sure that a successful
+ *   traversal of the chain leads to a chain that is present in the
+ *   linked-list (the start node is never removed) and that it does not
+ *   contain the "removed" node anymore, even if concurrent delete/add
+ *   operations are changing the structure of the list concurrently.
+ * - The add operations perform garbage collection of buckets if they
+ *   encounter nodes with removed flag set in the bucket where they want
+ *   to add their new node. This ensures lock-freedom of add operation by
  *   helping the remover unlink nodes from the list rather than to wait
  *   for it do to so.
- * - A RCU "order table" indexed by log2(hash index) is copied and
- *   expanded by the resize operation. This order table allows finding
- *   the "bucket node" tables.
- * - There is one bucket node table per hash index order. The size of
- *   each bucket node table is half the number of hashes contained in
- *   this order (except for order 0).
- * - synchronzie_rcu is used to garbage-collect the old bucket node table.
- * - The per-order bucket node tables contain a compact version of the
- *   hash table nodes. These tables are invariant after they are
- *   populated into the hash table.
+ * - There are three memory backends for the hash table buckets: the
+ *   "order table", the "chunks", and the "mmap".
+ * - These bucket containers contain a compact version of the hash table
+ *   nodes.
+ * - The RCU "order table":
+ *   -  has a first level table indexed by log2(hash index) which is
+ *      copied and expanded by the resize operation. This order table
+ *      allows finding the "bucket node" tables.
+ *   - There is one bucket node table per hash index order. The size of
+ *     each bucket node table is half the number of hashes contained in
+ *     this order (except for order 0).
+ * - The RCU "chunks" is best suited for close interaction with a page
+ *   allocator. It uses a linear array as index to "chunks" containing
+ *   each the same number of buckets.
+ * - The RCU "mmap" memory backend uses a single memory map to hold
+ *   all buckets.
+ * - synchronize_rcu is used to garbage-collect the old bucket node table.
+ *
+ * Ordering Guarantees:
+ *
+ * To discuss these guarantees, we first define "read" operation as any
+ * of the the basic cds_lfht_lookup, cds_lfht_next_duplicate,
+ * cds_lfht_first, cds_lfht_next operation, as well as
+ * cds_lfht_add_unique (failure). 
+ *
+ * We define "read traversal" operation as any of the following
+ * group of operations
+ *  - cds_lfht_lookup followed by iteration with cds_lfht_next_duplicate
+ *    (and/or cds_lfht_next, although less common).
+ *  - cds_lfht_add_unique (failure) followed by iteration with
+ *    cds_lfht_next_duplicate (and/or cds_lfht_next, although less
+ *    common).
+ *  - cds_lfht_first followed iteration with cds_lfht_next (and/or
+ *    cds_lfht_next_duplicate, although less common).
+ *
+ * We define "write" operations as any of cds_lfht_add, cds_lfht_replace,
+ * cds_lfht_add_unique (success), cds_lfht_add_replace, cds_lfht_del.
+ *
+ * When cds_lfht_add_unique succeeds (returns the node passed as
+ * parameter), it acts as a "write" operation. When cds_lfht_add_unique
+ * fails (returns a node different from the one passed as parameter), it
+ * acts as a "read" operation. A cds_lfht_add_unique failure is a
+ * cds_lfht_lookup "read" operation, therefore, any ordering guarantee
+ * referring to "lookup" imply any of "lookup" or cds_lfht_add_unique
+ * (failure).
+ *
+ * We define "prior" and "later" node as nodes observable by reads and
+ * read traversals respectively before and after a write or sequence of
+ * write operations.
+ *
+ * Hash-table operations are often cascaded, for example, the pointer
+ * returned by a cds_lfht_lookup() might be passed to a cds_lfht_next(),
+ * whose return value might in turn be passed to another hash-table
+ * operation. This entire cascaded series of operations must be enclosed
+ * by a pair of matching rcu_read_lock() and rcu_read_unlock()
+ * operations.
+ *
+ * The following ordering guarantees are offered by this hash table:
+ *
+ * A.1) "read" after "write": if there is ordering between a write and a
+ *      later read, then the read is guaranteed to see the write or some
+ *      later write.
+ * A.2) "read traversal" after "write": given that there is dependency
+ *      ordering between reads in a "read traversal", if there is
+ *      ordering between a write and the first read of the traversal,
+ *      then the "read traversal" is guaranteed to see the write or
+ *      some later write.
+ * B.1) "write" after "read": if there is ordering between a read and a
+ *      later write, then the read will never see the write.
+ * B.2) "write" after "read traversal": given that there is dependency
+ *      ordering between reads in a "read traversal", if there is
+ *      ordering between the last read of the traversal and a later
+ *      write, then the "read traversal" will never see the write.
+ * C)   "write" while "read traversal": if a write occurs during a "read
+ *      traversal", the traversal may, or may not, see the write.
+ * D.1) "write" after "write": if there is ordering between a write and
+ *      a later write, then the later write is guaranteed to see the
+ *      effects of the first write.
+ * D.2) Concurrent "write" pairs: The system will assign an arbitrary
+ *      order to any pair of concurrent conflicting writes.
+ *      Non-conflicting writes (for example, to different keys) are
+ *      unordered.
+ * E)   If a grace period separates a "del" or "replace" operation
+ *      and a subsequent operation, then that subsequent operation is
+ *      guaranteed not to see the removed item.
+ * F)   Uniqueness guarantee: given a hash table that does not contain
+ *      duplicate items for a given key, there will only be one item in
+ *      the hash table after an arbitrary sequence of add_unique and/or
+ *      add_replace operations. Note, however, that a pair of
+ *      concurrent read operations might well access two different items
+ *      with that key.
+ * G.1) If a pair of lookups for a given key are ordered (e.g. by a
+ *      memory barrier), then the second lookup will return the same
+ *      node as the previous lookup, or some later node.
+ * G.2) A "read traversal" that starts after the end of a prior "read
+ *      traversal" (ordered by memory barriers) is guaranteed to see the
+ *      same nodes as the previous traversal, or some later nodes.
+ * G.3) Concurrent "read" pairs: concurrent reads are unordered. For
+ *      example, if a pair of reads to the same key run concurrently
+ *      with an insertion of that same key, the reads remain unordered
+ *      regardless of their return values. In other words, you cannot
+ *      rely on the values returned by the reads to deduce ordering.
+ *
+ * Progress guarantees:
+ *
+ * * Reads are wait-free. These operations always move forward in the
+ *   hash table linked list, and this list has no loop.
+ * * Writes are lock-free. Any retry loop performed by a write operation
+ *   is triggered by progress made within another update operation.
  *
  * Bucket node tables:
  *
  *
  * A bit of ascii art explanation:
  * 
- * Order index is the off-by-one compare to the actual power of 2 because 
- * we use index 0 to deal with the 0 special-case.
+ * The order index is the off-by-one compared to the actual power of 2
+ * because we use index 0 to deal with the 0 special-case.
  * 
  * This shows the nodes for a small table ordered by reversed bits:
  * 
  */
 
 #define _LGPL_SOURCE
+#define _GNU_SOURCE
 #include <stdlib.h>
 #include <errno.h>
 #include <assert.h>
 #include <stdio.h>
 #include <stdint.h>
 #include <string.h>
+#include <sched.h>
 
 #include "config.h"
 #include <urcu.h>
  * Split-counters lazily update the global counter each 1024
  * addition/removal. It automatically keeps track of resize required.
  * We use the bucket length as indicator for need to expand for small
- * tables and machines lacking per-cpu data suppport.
+ * tables and machines lacking per-cpu data support.
  */
 #define COUNT_COMMIT_ORDER             10
 #define DEFAULT_SPLIT_COUNT_MASK       0xFUL
  * removal, and that node garbage collection must be performed.
  * The bucket flag does not require to be updated atomically with the
  * pointer, but it is added as a pointer low bit flag to save space.
+ * The "removal owner" flag is used to detect which of the "del"
+ * operation that has set the "removed flag" gets to return the removed
+ * node to its caller. Note that the replace operation does not need to
+ * iteract with the "removal owner" flag, because it validates that
+ * the "removed" flag is not set before performing its cmpxchg.
  */
 #define REMOVED_FLAG           (1UL << 0)
 #define BUCKET_FLAG            (1UL << 1)
-#define FLAGS_MASK             ((1UL << 2) - 1)
+#define REMOVAL_OWNER_FLAG     (1UL << 2)
+#define FLAGS_MASK             ((1UL << 3) - 1)
 
 /* Value of the end pointer. Should not interact with flags. */
 #define END_VALUE              NULL
@@ -267,7 +381,8 @@ uint8_t bit_reverse_u8(uint8_t v)
        return BitReverseTable256[v];
 }
 
-static __attribute__((unused))
+#if (CAA_BITS_PER_LONG == 32)
+static
 uint32_t bit_reverse_u32(uint32_t v)
 {
        return ((uint32_t) bit_reverse_u8(v) << 24) | 
@@ -275,8 +390,8 @@ uint32_t bit_reverse_u32(uint32_t v)
                ((uint32_t) bit_reverse_u8(v >> 16) << 8) | 
                ((uint32_t) bit_reverse_u8(v >> 24));
 }
-
-static __attribute__((unused))
+#else
+static
 uint64_t bit_reverse_u64(uint64_t v)
 {
        return ((uint64_t) bit_reverse_u8(v) << 56) | 
@@ -288,6 +403,7 @@ uint64_t bit_reverse_u64(uint64_t v)
                ((uint64_t) bit_reverse_u8(v >> 48) << 8) |
                ((uint64_t) bit_reverse_u8(v >> 56));
 }
+#endif
 
 static
 unsigned long bit_reverse_ulong(unsigned long v)
@@ -405,7 +521,7 @@ unsigned int fls_u32(uint32_t x)
 }
 #endif
 
-unsigned int fls_ulong(unsigned long x)
+unsigned int cds_lfht_fls_ulong(unsigned long x)
 {
 #if (CAA_BITS_PER_LONG == 32)
        return fls_u32(x);
@@ -418,7 +534,7 @@ unsigned int fls_ulong(unsigned long x)
  * Return the minimum order for which x <= (1UL << order).
  * Return -1 if x is 0.
  */
-int get_count_order_u32(uint32_t x)
+int cds_lfht_get_count_order_u32(uint32_t x)
 {
        if (!x)
                return -1;
@@ -430,12 +546,12 @@ int get_count_order_u32(uint32_t x)
  * Return the minimum order for which x <= (1UL << order).
  * Return -1 if x is 0.
  */
-int get_count_order_ulong(unsigned long x)
+int cds_lfht_get_count_order_ulong(unsigned long x)
 {
        if (!x)
                return -1;
 
-       return fls_ulong(x - 1);
+       return cds_lfht_fls_ulong(x - 1);
 }
 
 static
@@ -447,6 +563,7 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
 
 static long nr_cpus_mask = -1;
 static long split_count_mask = -1;
+static int split_count_order = -1;
 
 #if defined(HAVE_SYSCONF)
 static void ht_init_nr_cpus_mask(void)
@@ -462,7 +579,7 @@ static void ht_init_nr_cpus_mask(void)
         * round up number of CPUs to next power of two, so we
         * can use & for modulo.
         */
-       maxcpus = 1UL << get_count_order_ulong(maxcpus);
+       maxcpus = 1UL << cds_lfht_get_count_order_ulong(maxcpus);
        nr_cpus_mask = maxcpus - 1;
 }
 #else /* #if defined(HAVE_SYSCONF) */
@@ -475,20 +592,21 @@ static void ht_init_nr_cpus_mask(void)
 static
 void alloc_split_items_count(struct cds_lfht *ht)
 {
-       struct ht_items_count *count;
-
        if (nr_cpus_mask == -1) {
                ht_init_nr_cpus_mask();
                if (nr_cpus_mask < 0)
                        split_count_mask = DEFAULT_SPLIT_COUNT_MASK;
                else
                        split_count_mask = nr_cpus_mask;
+               split_count_order =
+                       cds_lfht_get_count_order_ulong(split_count_mask + 1);
        }
 
        assert(split_count_mask >= 0);
 
        if (ht->flags & CDS_LFHT_ACCOUNTING) {
-               ht->split_count = calloc(split_count_mask + 1, sizeof(*count));
+               ht->split_count = calloc(split_count_mask + 1,
+                                       sizeof(struct ht_items_count));
                assert(ht->split_count);
        } else {
                ht->split_count = NULL;
@@ -598,14 +716,39 @@ void check_resize(struct cds_lfht *ht, unsigned long size, uint32_t chain_len)
         * Use bucket-local length for small table expand and for
         * environments lacking per-cpu data support.
         */
-       if (count >= (1UL << COUNT_COMMIT_ORDER))
+       if (count >= (1UL << (COUNT_COMMIT_ORDER + split_count_order)))
                return;
        if (chain_len > 100)
                dbg_printf("WARNING: large chain length: %u.\n",
                           chain_len);
-       if (chain_len >= CHAIN_LEN_RESIZE_THRESHOLD)
-               cds_lfht_resize_lazy_grow(ht, size,
-                       get_count_order_u32(chain_len - (CHAIN_LEN_TARGET - 1)));
+       if (chain_len >= CHAIN_LEN_RESIZE_THRESHOLD) {
+               int growth;
+
+               /*
+                * Ideal growth calculated based on chain length.
+                */
+               growth = cds_lfht_get_count_order_u32(chain_len
+                               - (CHAIN_LEN_TARGET - 1));
+               if ((ht->flags & CDS_LFHT_ACCOUNTING)
+                               && (size << growth)
+                                       >= (1UL << (COUNT_COMMIT_ORDER
+                                               + split_count_order))) {
+                       /*
+                        * If ideal growth expands the hash table size
+                        * beyond the "small hash table" sizes, use the
+                        * maximum small hash table size to attempt
+                        * expanding the hash table. This only applies
+                        * when node accounting is available, otherwise
+                        * the chain length is used to expand the hash
+                        * table in every case.
+                        */
+                       growth = COUNT_COMMIT_ORDER + split_count_order
+                               - cds_lfht_get_count_order_ulong(size);
+                       if (growth <= 0)
+                               return;
+               }
+               cds_lfht_resize_lazy_grow(ht, size, growth);
+       }
 }
 
 static
@@ -620,12 +763,6 @@ int is_removed(struct cds_lfht_node *node)
        return ((unsigned long) node) & REMOVED_FLAG;
 }
 
-static
-struct cds_lfht_node *flag_removed(struct cds_lfht_node *node)
-{
-       return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG);
-}
-
 static
 int is_bucket(struct cds_lfht_node *node)
 {
@@ -638,6 +775,24 @@ struct cds_lfht_node *flag_bucket(struct cds_lfht_node *node)
        return (struct cds_lfht_node *) (((unsigned long) node) | BUCKET_FLAG);
 }
 
+static
+int is_removal_owner(struct cds_lfht_node *node)
+{
+       return ((unsigned long) node) & REMOVAL_OWNER_FLAG;
+}
+
+static
+struct cds_lfht_node *flag_removal_owner(struct cds_lfht_node *node)
+{
+       return (struct cds_lfht_node *) (((unsigned long) node) | REMOVAL_OWNER_FLAG);
+}
+
+static
+struct cds_lfht_node *flag_removed_or_removal_owner(struct cds_lfht_node *node)
+{
+       return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG | REMOVAL_OWNER_FLAG);
+}
+
 static
 struct cds_lfht_node *get_end(void)
 {
@@ -706,13 +861,16 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod
 
        assert(!is_bucket(bucket));
        assert(!is_removed(bucket));
+       assert(!is_removal_owner(bucket));
        assert(!is_bucket(node));
        assert(!is_removed(node));
+       assert(!is_removal_owner(node));
        for (;;) {
                iter_prev = bucket;
                /* We can always skip the bucket node initially */
                iter = rcu_dereference(iter_prev->next);
                assert(!is_removed(iter));
+               assert(!is_removal_owner(iter));
                assert(iter_prev->reverse_hash <= node->reverse_hash);
                /*
                 * We should never be called with bucket (start of chain)
@@ -733,13 +891,13 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod
                        iter = next;
                }
                assert(!is_removed(iter));
+               assert(!is_removal_owner(iter));
                if (is_bucket(iter))
                        new_next = flag_bucket(clear_flag(next));
                else
                        new_next = clear_flag(next);
                (void) uatomic_cmpxchg(&iter_prev->next, iter, new_next);
        }
-       return;
 }
 
 static
@@ -754,8 +912,10 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
                return -ENOENT;
 
        assert(!is_removed(old_node));
+       assert(!is_removal_owner(old_node));
        assert(!is_bucket(old_node));
        assert(!is_removed(new_node));
+       assert(!is_removal_owner(new_node));
        assert(!is_bucket(new_node));
        assert(new_node != old_node);
        for (;;) {
@@ -767,9 +927,15 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
                         */
                        return -ENOENT;
                }
-               assert(!is_bucket(old_next));
-               assert(new_node != clear_flag(old_next));
-               new_node->next = clear_flag(old_next);
+               assert(old_next == clear_flag(old_next));
+               assert(new_node != old_next);
+               /*
+                * REMOVAL_OWNER flag is _NEVER_ set before the REMOVED
+                * flag. It is either set atomically at the same time
+                * (replace) or after (del).
+                */
+               assert(!is_removal_owner(old_next));
+               new_node->next = old_next;
                /*
                 * Here is the whole trick for lock-free replace: we add
                 * the replacement node _after_ the node we want to
@@ -779,9 +945,14 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
                 * next pointer, they will either skip the old node due
                 * to the removal flag and see the new node, or use
                 * the old node, but will not see the new one.
+                * This is a replacement of a node with another node
+                * that has the same value: we are therefore not
+                * removing a value from the hash table. We set both the
+                * REMOVED and REMOVAL_OWNER flags atomically so we own
+                * the node after successful cmpxchg.
                 */
                ret_next = uatomic_cmpxchg(&old_node->next,
-                             old_next, flag_removed(new_node));
+                       old_next, flag_removed_or_removal_owner(new_node));
                if (ret_next == old_next)
                        break;          /* We performed the replacement. */
                old_next = ret_next;
@@ -795,7 +966,7 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
        bucket = lookup_bucket(ht, size, bit_reverse_ulong(old_node->reverse_hash));
        _cds_lfht_gc_bucket(bucket, new_node);
 
-       assert(is_removed(rcu_dereference(old_node->next)));
+       assert(is_removed(CMM_LOAD_SHARED(old_node->next)));
        return 0;
 }
 
@@ -805,6 +976,7 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
  */
 static
 void _cds_lfht_add(struct cds_lfht *ht,
+               unsigned long hash,
                cds_lfht_match_fct match,
                const void *key,
                unsigned long size,
@@ -818,7 +990,8 @@ void _cds_lfht_add(struct cds_lfht *ht,
 
        assert(!is_bucket(node));
        assert(!is_removed(node));
-       bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash));
+       assert(!is_removal_owner(node));
+       bucket = lookup_bucket(ht, size, hash);
        for (;;) {
                uint32_t chain_len = 0;
 
@@ -856,8 +1029,8 @@ void _cds_lfht_add(struct cds_lfht *ht,
                                 *
                                 * This semantic ensures no duplicated keys
                                 * should ever be observable in the table
-                                * (including observe one node by one node
-                                * by forward iterations)
+                                * (including traversing the table node by
+                                * node by forward iterations)
                                 */
                                cds_lfht_next_duplicate(ht, match, key, &d_iter);
                                if (!d_iter.node)
@@ -878,7 +1051,9 @@ void _cds_lfht_add(struct cds_lfht *ht,
        insert:
                assert(node != clear_flag(iter));
                assert(!is_removed(iter_prev));
+               assert(!is_removal_owner(iter_prev));
                assert(!is_removed(iter));
+               assert(!is_removal_owner(iter));
                assert(iter_prev != node);
                if (!bucket_flag)
                        node->next = clear_flag(iter);
@@ -898,6 +1073,7 @@ void _cds_lfht_add(struct cds_lfht *ht,
 
        gc_node:
                assert(!is_removed(iter));
+               assert(!is_removal_owner(iter));
                if (is_bucket(iter))
                        new_next = flag_bucket(clear_flag(next));
                else
@@ -914,10 +1090,9 @@ end:
 
 static
 int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
-               struct cds_lfht_node *node,
-               int bucket_removal)
+               struct cds_lfht_node *node)
 {
-       struct cds_lfht_node *bucket, *next, *old;
+       struct cds_lfht_node *bucket, *next;
 
        if (!node)      /* Return -ENOENT if asked to delete NULL node */
                return -ENOENT;
@@ -925,20 +1100,30 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
        /* logically delete the node */
        assert(!is_bucket(node));
        assert(!is_removed(node));
-       old = rcu_dereference(node->next);
-       do {
-               struct cds_lfht_node *new_next;
+       assert(!is_removal_owner(node));
 
-               next = old;
-               if (caa_unlikely(is_removed(next)))
-                       return -ENOENT;
-               if (bucket_removal)
-                       assert(is_bucket(next));
-               else
-                       assert(!is_bucket(next));
-               new_next = flag_removed(next);
-               old = uatomic_cmpxchg(&node->next, next, new_next);
-       } while (old != next);
+       /*
+        * We are first checking if the node had previously been
+        * logically removed (this check is not atomic with setting the
+        * logical removal flag). Return -ENOENT if the node had
+        * previously been removed.
+        */
+       next = CMM_LOAD_SHARED(node->next);     /* next is not dereferenced */
+       if (caa_unlikely(is_removed(next)))
+               return -ENOENT;
+       assert(!is_bucket(next));
+       /*
+        * The del operation semantic guarantees a full memory barrier
+        * before the uatomic_or atomic commit of the deletion flag.
+        */
+       cmm_smp_mb__before_uatomic_or();
+       /*
+        * We set the REMOVED_FLAG unconditionally. Note that there may
+        * be more than one concurrent thread setting this flag.
+        * Knowing which wins the race will be known after the garbage
+        * collection phase, stay tuned!
+        */
+       uatomic_or(&node->next, REMOVED_FLAG);
        /* We performed the (logical) deletion. */
 
        /*
@@ -949,8 +1134,24 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
        bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash));
        _cds_lfht_gc_bucket(bucket, node);
 
-       assert(is_removed(rcu_dereference(node->next)));
-       return 0;
+       assert(is_removed(CMM_LOAD_SHARED(node->next)));
+       /*
+        * Last phase: atomically exchange node->next with a version
+        * having "REMOVAL_OWNER_FLAG" set. If the returned node->next
+        * pointer did _not_ have "REMOVAL_OWNER_FLAG" set, we now own
+        * the node and win the removal race.
+        * It is interesting to note that all "add" paths are forbidden
+        * to change the next pointer starting from the point where the
+        * REMOVED_FLAG is set, so here using a read, followed by a
+        * xchg() suffice to guarantee that the xchg() will ever only
+        * set the "REMOVAL_OWNER_FLAG" (or change nothing if the flag
+        * was already set).
+        */
+       if (!is_removal_owner(uatomic_xchg(&node->next,
+                       flag_removal_owner(node->next))))
+               return 0;
+       else
+               return -ENOENT;
 }
 
 static
@@ -986,7 +1187,7 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
        } else {
                nr_threads = 1;
        }
-       partition_len = len >> get_count_order_ulong(nr_threads);
+       partition_len = len >> cds_lfht_get_count_order_ulong(nr_threads);
        work = calloc(nr_threads, sizeof(*work));
        assert(work);
        for (thread = 0; thread < nr_threads; thread++) {
@@ -1032,7 +1233,7 @@ void init_table_populate_partition(struct cds_lfht *ht, unsigned long i,
                dbg_printf("init populate: order %lu index %lu hash %lu\n",
                           i, j, j);
                new_node->reverse_hash = bit_reverse_ulong(j);
-               _cds_lfht_add(ht, NULL, NULL, size, new_node, NULL, 1);
+               _cds_lfht_add(ht, j, NULL, NULL, size, new_node, NULL, 1);
        }
        ht->flavor->read_unlock();
 }
@@ -1107,8 +1308,8 @@ void init_table(struct cds_lfht *ht,
  * removed nodes have been garbage-collected (unlinked) before call_rcu is
  * invoked to free a hole level of bucket nodes (after a grace period).
  *
- * Logical removal and garbage collection can therefore be done in batch or on a
- * node-per-node basis, as long as the guarantee above holds.
+ * Logical removal and garbage collection can therefore be done in batch
+ * or on a node-per-node basis, as long as the guarantee above holds.
  *
  * When we reach a certain length, we can split this removal over many worker
  * threads, based on the number of CPUs available in the system. This should
@@ -1124,13 +1325,15 @@ void remove_table_partition(struct cds_lfht *ht, unsigned long i,
        assert(i > MIN_TABLE_ORDER);
        ht->flavor->read_lock();
        for (j = size + start; j < size + start + len; j++) {
-               struct cds_lfht_node *fini_node = bucket_at(ht, j);
+               struct cds_lfht_node *fini_bucket = bucket_at(ht, j);
+               struct cds_lfht_node *parent_bucket = bucket_at(ht, j - size);
 
                assert(j >= size && j < (size << 1));
                dbg_printf("remove entry: order %lu index %lu hash %lu\n",
                           i, j, j);
-               fini_node->reverse_hash = bit_reverse_ulong(j);
-               (void) _cds_lfht_del(ht, size, fini_node, 1);
+               /* Set the REMOVED_FLAG to freeze the ->next for gc */
+               uatomic_or(&fini_bucket->next, REMOVED_FLAG);
+               _cds_lfht_gc_bucket(parent_bucket, fini_bucket);
        }
        ht->flavor->read_unlock();
 }
@@ -1221,7 +1424,7 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size)
        node->next = flag_bucket(get_end());
        node->reverse_hash = 0;
 
-       for (order = 1; order < get_count_order_ulong(size) + 1; order++) {
+       for (order = 1; order < cds_lfht_get_count_order_ulong(size) + 1; order++) {
                len = 1UL << (order - 1);
                cds_lfht_alloc_bucket_table(ht, order);
 
@@ -1271,6 +1474,30 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size,
        if (!init_size || (init_size & (init_size - 1)))
                return NULL;
 
+       /*
+        * Memory management plugin default.
+        */
+       if (!mm) {
+               if (CAA_BITS_PER_LONG > 32
+                               && max_nr_buckets
+                               && max_nr_buckets <= (1ULL << 32)) {
+                       /*
+                        * For 64-bit architectures, with max number of
+                        * buckets small enough not to use the entire
+                        * 64-bit memory mapping space (and allowing a
+                        * fair number of hash table instances), use the
+                        * mmap allocator, which is faster than the
+                        * order allocator.
+                        */
+                       mm = &cds_lfht_mm_mmap;
+               } else {
+                       /*
+                        * The fallback is to use the order allocator.
+                        */
+                       mm = &cds_lfht_mm_order;
+               }
+       }
+
        /* max_nr_buckets == 0 for order based mm means infinite */
        if (mm == &cds_lfht_mm_order && !max_nr_buckets)
                max_nr_buckets = 1UL << (MAX_TABLE_ORDER - 1);
@@ -1295,7 +1522,7 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size,
        alloc_split_items_count(ht);
        /* this mutex should not nest in read-side C.S. */
        pthread_mutex_init(&ht->resize_mutex, NULL);
-       order = get_count_order_ulong(init_size);
+       order = cds_lfht_get_count_order_ulong(init_size);
        ht->resize_target = 1UL << order;
        cds_lfht_create_bucket(ht, 1UL << order);
        ht->size = 1UL << order;
@@ -1335,7 +1562,7 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash,
                }
                node = clear_flag(next);
        }
-       assert(!node || !is_bucket(rcu_dereference(node->next)));
+       assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
        iter->node = node;
        iter->next = next;
 }
@@ -1368,7 +1595,7 @@ void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match,
                }
                node = clear_flag(next);
        }
-       assert(!node || !is_bucket(rcu_dereference(node->next)));
+       assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
        iter->node = node;
        iter->next = next;
 }
@@ -1390,7 +1617,7 @@ void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter)
                }
                node = clear_flag(next);
        }
-       assert(!node || !is_bucket(rcu_dereference(node->next)));
+       assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
        iter->node = node;
        iter->next = next;
 }
@@ -1410,9 +1637,9 @@ void cds_lfht_add(struct cds_lfht *ht, unsigned long hash,
 {
        unsigned long size;
 
-       node->reverse_hash = bit_reverse_ulong((unsigned long) hash);
+       node->reverse_hash = bit_reverse_ulong(hash);
        size = rcu_dereference(ht->size);
-       _cds_lfht_add(ht, NULL, NULL, size, node, NULL, 0);
+       _cds_lfht_add(ht, hash, NULL, NULL, size, node, NULL, 0);
        ht_count_add(ht, size, hash);
 }
 
@@ -1425,9 +1652,9 @@ struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht,
        unsigned long size;
        struct cds_lfht_iter iter;
 
-       node->reverse_hash = bit_reverse_ulong((unsigned long) hash);
+       node->reverse_hash = bit_reverse_ulong(hash);
        size = rcu_dereference(ht->size);
-       _cds_lfht_add(ht, match, key, size, node, &iter, 0);
+       _cds_lfht_add(ht, hash, match, key, size, node, &iter, 0);
        if (iter.node == node)
                ht_count_add(ht, size, hash);
        return iter.node;
@@ -1442,10 +1669,10 @@ struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht,
        unsigned long size;
        struct cds_lfht_iter iter;
 
-       node->reverse_hash = bit_reverse_ulong((unsigned long) hash);
+       node->reverse_hash = bit_reverse_ulong(hash);
        size = rcu_dereference(ht->size);
        for (;;) {
-               _cds_lfht_add(ht, match, key, size, node, &iter, 0);
+               _cds_lfht_add(ht, hash, match, key, size, node, &iter, 0);
                if (iter.node == node) {
                        ht_count_add(ht, size, hash);
                        return NULL;
@@ -1456,30 +1683,48 @@ struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht,
        }
 }
 
-int cds_lfht_replace(struct cds_lfht *ht, struct cds_lfht_iter *old_iter,
+int cds_lfht_replace(struct cds_lfht *ht,
+               struct cds_lfht_iter *old_iter,
+               unsigned long hash,
+               cds_lfht_match_fct match,
+               const void *key,
                struct cds_lfht_node *new_node)
 {
        unsigned long size;
 
+       new_node->reverse_hash = bit_reverse_ulong(hash);
+       if (!old_iter->node)
+               return -ENOENT;
+       if (caa_unlikely(old_iter->node->reverse_hash != new_node->reverse_hash))
+               return -EINVAL;
+       if (caa_unlikely(!match(old_iter->node, key)))
+               return -EINVAL;
        size = rcu_dereference(ht->size);
        return _cds_lfht_replace(ht, size, old_iter->node, old_iter->next,
                        new_node);
 }
 
-int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node)
 {
-       unsigned long size, hash;
+       unsigned long size;
        int ret;
 
        size = rcu_dereference(ht->size);
-       ret = _cds_lfht_del(ht, size, iter->node, 0);
+       ret = _cds_lfht_del(ht, size, node);
        if (!ret) {
-               hash = bit_reverse_ulong(iter->node->reverse_hash);
+               unsigned long hash;
+
+               hash = bit_reverse_ulong(node->reverse_hash);
                ht_count_del(ht, size, hash);
        }
        return ret;
 }
 
+int cds_lfht_is_node_deleted(struct cds_lfht_node *node)
+{
+       return is_removed(CMM_LOAD_SHARED(node->next));
+}
+
 static
 int cds_lfht_delete_bucket(struct cds_lfht *ht)
 {
@@ -1493,13 +1738,14 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht)
                if (!is_bucket(node))
                        return -EPERM;
                assert(!is_removed(node));
+               assert(!is_removal_owner(node));
        } while (!is_end(node));
        /*
         * size accessed without rcu_dereference because hash table is
         * being destroyed.
         */
        size = ht->size;
-       /* Internal sanity check: all nodes left should be bucket */
+       /* Internal sanity check: all nodes left should be buckets */
        for (i = 0; i < size; i++) {
                node = bucket_at(ht, i);
                dbg_printf("delete bucket: index %lu expected hash %lu hash %lu\n",
@@ -1507,7 +1753,7 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht)
                assert(is_bucket(node->next));
        }
 
-       for (order = get_count_order_ulong(size); (long)order >= 0; order--)
+       for (order = cds_lfht_get_count_order_ulong(size); (long)order >= 0; order--)
                cds_lfht_free_bucket_table(ht, order);
 
        return 0;
@@ -1519,13 +1765,25 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht)
  */
 int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
 {
-       int ret;
+       int ret, was_online;
 
        /* Wait for in-flight resize operations to complete */
        _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
        cmm_smp_mb();   /* Store destroy before load resize */
+       was_online = ht->flavor->read_ongoing();
+       if (was_online)
+               ht->flavor->thread_offline();
+       /* Calling with RCU read-side held is an error. */
+       if (ht->flavor->read_ongoing()) {
+               ret = -EINVAL;
+               if (was_online)
+                       ht->flavor->thread_online();
+               goto end;
+       }
        while (uatomic_read(&ht->in_progress_resize))
                poll(NULL, 0, 100);     /* wait for 100ms */
+       if (was_online)
+               ht->flavor->thread_online();
        ret = cds_lfht_delete_bucket(ht);
        if (ret)
                return ret;
@@ -1533,17 +1791,17 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
        if (attr)
                *attr = ht->resize_attr;
        poison_free(ht);
+end:
        return ret;
 }
 
 void cds_lfht_count_nodes(struct cds_lfht *ht,
                long *approx_before,
                unsigned long *count,
-               unsigned long *removed,
                long *approx_after)
 {
        struct cds_lfht_node *node, *next;
-       unsigned long nr_bucket = 0;
+       unsigned long nr_bucket = 0, nr_removed = 0;
 
        *approx_before = 0;
        if (ht->split_count) {
@@ -1556,7 +1814,6 @@ void cds_lfht_count_nodes(struct cds_lfht *ht,
        }
 
        *count = 0;
-       *removed = 0;
 
        /* Count non-bucket nodes in the table */
        node = bucket_at(ht, 0);
@@ -1564,7 +1821,7 @@ void cds_lfht_count_nodes(struct cds_lfht *ht,
                next = rcu_dereference(node->next);
                if (is_removed(next)) {
                        if (!is_bucket(next))
-                               (*removed)++;
+                               (nr_removed)++;
                        else
                                (nr_bucket)++;
                } else if (!is_bucket(next))
@@ -1573,6 +1830,7 @@ void cds_lfht_count_nodes(struct cds_lfht *ht,
                        (nr_bucket)++;
                node = clear_flag(next);
        } while (!is_end(node));
+       dbg_printf("number of logically removed nodes: %lu\n", nr_removed);
        dbg_printf("number of bucket nodes: %lu\n", nr_bucket);
        *approx_after = 0;
        if (ht->split_count) {
@@ -1592,8 +1850,8 @@ void _do_cds_lfht_grow(struct cds_lfht *ht,
 {
        unsigned long old_order, new_order;
 
-       old_order = get_count_order_ulong(old_size);
-       new_order = get_count_order_ulong(new_size);
+       old_order = cds_lfht_get_count_order_ulong(old_size);
+       new_order = cds_lfht_get_count_order_ulong(new_size);
        dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n",
                   old_size, old_order, new_size, new_order);
        assert(new_size > old_size);
@@ -1608,8 +1866,8 @@ void _do_cds_lfht_shrink(struct cds_lfht *ht,
        unsigned long old_order, new_order;
 
        new_size = max(new_size, MIN_TABLE_SIZE);
-       old_order = get_count_order_ulong(old_size);
-       new_order = get_count_order_ulong(new_size);
+       old_order = cds_lfht_get_count_order_ulong(old_size);
+       new_order = cds_lfht_get_count_order_ulong(new_size);
        dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n",
                   old_size, old_order, new_size, new_order);
        assert(new_size < old_size);
@@ -1662,13 +1920,30 @@ void resize_target_update_count(struct cds_lfht *ht,
 
 void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
 {
+       int was_online;
+
+       was_online = ht->flavor->read_ongoing();
+       if (was_online)
+               ht->flavor->thread_offline();
+       /* Calling with RCU read-side held is an error. */
+       if (ht->flavor->read_ongoing()) {
+               static int print_once;
+
+               if (!CMM_LOAD_SHARED(print_once))
+                       fprintf(stderr, "[error] rculfhash: cds_lfht_resize "
+                               "called with RCU read-side lock held.\n");
+               CMM_STORE_SHARED(print_once, 1);
+               assert(0);
+               goto end;
+       }
        resize_target_update_count(ht, new_size);
        CMM_STORE_SHARED(ht->resize_initiated, 1);
-       ht->flavor->thread_offline();
        pthread_mutex_lock(&ht->resize_mutex);
        _do_cds_lfht_resize(ht);
        pthread_mutex_unlock(&ht->resize_mutex);
-       ht->flavor->thread_online();
+end:
+       if (was_online)
+               ht->flavor->thread_online();
 }
 
 static
@@ -1703,6 +1978,11 @@ void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht)
                        return;
                }
                work = malloc(sizeof(*work));
+               if (work == NULL) {
+                       dbg_printf("error allocating resize work, bailing out\n");
+                       uatomic_dec(&ht->in_progress_resize);
+                       return;
+               }
                work->ht = ht;
                ht->flavor->update_call_rcu(&work->head, do_resize_cb);
                CMM_STORE_SHARED(ht->resize_initiated, 1);
This page took 0.035075 seconds and 4 git commands to generate.