Fix rcuja: chain/unchain locking vs retry
[urcu.git] / rcuja / rcuja.c
index 6b54219795cf78ac2eb28583cdb13a214d204878..e6e3a3e6bcc5a10dcc6f0f24354ac5d2788c0573 100644 (file)
@@ -61,33 +61,6 @@ struct cds_ja_type {
        uint16_t pool_size_order;       /* pool size */
 };
 
-/*
- * Number of least significant pointer bits reserved to represent the
- * child type.
- */
-#define JA_TYPE_BITS   3
-#define JA_TYPE_MAX_NR (1UL << JA_TYPE_BITS)
-#define JA_TYPE_MASK   (JA_TYPE_MAX_NR - 1)
-#define JA_PTR_MASK    (~JA_TYPE_MASK)
-
-#define JA_ENTRY_PER_NODE      256UL
-#define JA_LOG2_BITS_PER_BYTE  3U
-#define JA_BITS_PER_BYTE       (1U << JA_LOG2_BITS_PER_BYTE)
-
-#define JA_MAX_DEPTH   9       /* Maximum depth, including leafs */
-
-/*
- * Entry for NULL node is at index 8 of the table. It is never encoded
- * in flags.
- */
-#define NODE_INDEX_NULL                8
-
-/*
- * Number of removals needed on a fallback node before we try to shrink
- * it.
- */
-#define JA_FALLBACK_REMOVAL_COUNT      8
-
 /*
  * Iteration on the array to find the right node size for the number of
  * children stops when it reaches .max_child == 256 (this is the largest
@@ -266,33 +239,6 @@ enum ja_recompact {
        JA_RECOMPACT_DEL,
 };
 
-static
-struct cds_ja_inode_flag *ja_node_flag(struct cds_ja_inode *node,
-               unsigned long type)
-{
-       assert(type < (1UL << JA_TYPE_BITS));
-       return (struct cds_ja_inode_flag *) (((unsigned long) node) | type);
-}
-
-static
-struct cds_ja_inode *ja_node_ptr(struct cds_ja_inode_flag *node)
-{
-       return (struct cds_ja_inode *) (((unsigned long) node) & JA_PTR_MASK);
-}
-
-static
-unsigned long ja_node_type(struct cds_ja_inode_flag *node)
-{
-       unsigned long type;
-
-       if (ja_node_ptr(node) == NULL) {
-               return NODE_INDEX_NULL;
-       }
-       type = (unsigned int) ((unsigned long) node & JA_TYPE_MASK);
-       assert(type < (1UL << JA_TYPE_BITS));
-       return type;
-}
-
 struct cds_ja_inode *alloc_cds_ja_node(const struct cds_ja_type *ja_type)
 {
        return calloc(1U << ja_type->order, sizeof(char));
@@ -331,6 +277,7 @@ static
 struct cds_ja_inode_flag *ja_linear_node_get_nth(const struct cds_ja_type *type,
                struct cds_ja_inode *node,
                struct cds_ja_inode_flag ***child_node_flag_ptr,
+               struct cds_ja_inode_flag ***node_flag_ptr,
                uint8_t n)
 {
        uint8_t nr_child;
@@ -351,12 +298,17 @@ struct cds_ja_inode_flag *ja_linear_node_get_nth(const struct cds_ja_type *type,
                if (CMM_LOAD_SHARED(values[i]) == n)
                        break;
        }
-       if (i >= nr_child)
+       if (i >= nr_child) {
+               if (caa_unlikely(node_flag_ptr))
+                       *node_flag_ptr = NULL;
                return NULL;
+       }
        pointers = (struct cds_ja_inode_flag **) align_ptr_size(&values[type->max_linear_child]);
        ptr = rcu_dereference(pointers[i]);
        if (caa_unlikely(child_node_flag_ptr) && ptr)
                *child_node_flag_ptr = &pointers[i];
+       if (caa_unlikely(node_flag_ptr))
+               *node_flag_ptr = &pointers[i];
        return ptr;
 }
 
@@ -383,6 +335,7 @@ static
 struct cds_ja_inode_flag *ja_pool_node_get_nth(const struct cds_ja_type *type,
                struct cds_ja_inode *node,
                struct cds_ja_inode_flag ***child_node_flag_ptr,
+               struct cds_ja_inode_flag ***node_flag_ptr,
                uint8_t n)
 {
        struct cds_ja_inode *linear;
@@ -394,7 +347,8 @@ struct cds_ja_inode_flag *ja_pool_node_get_nth(const struct cds_ja_type *type,
         */
        linear = (struct cds_ja_inode *)
                &node->u.data[((unsigned long) n >> (CHAR_BIT - type->nr_pool_order)) << type->pool_size_order];
-       return ja_linear_node_get_nth(type, linear, child_node_flag_ptr, n);
+       return ja_linear_node_get_nth(type, linear, child_node_flag_ptr,
+               node_flag_ptr, n);
 }
 
 static
@@ -411,6 +365,7 @@ static
 struct cds_ja_inode_flag *ja_pigeon_node_get_nth(const struct cds_ja_type *type,
                struct cds_ja_inode *node,
                struct cds_ja_inode_flag ***child_node_flag_ptr,
+               struct cds_ja_inode_flag ***node_flag_ptr,
                uint8_t n)
 {
        struct cds_ja_inode_flag **child_node_flag;
@@ -421,6 +376,8 @@ struct cds_ja_inode_flag *ja_pigeon_node_get_nth(const struct cds_ja_type *type,
                child_node_flag);
        if (caa_unlikely(child_node_flag_ptr) && *child_node_flag)
                *child_node_flag_ptr = child_node_flag;
+       if (caa_unlikely(node_flag_ptr))
+               *node_flag_ptr = child_node_flag;
        return rcu_dereference(*child_node_flag);
 }
 
@@ -429,7 +386,7 @@ struct cds_ja_inode_flag *ja_pigeon_node_get_ith_pos(const struct cds_ja_type *t
                struct cds_ja_inode *node,
                uint8_t i)
 {
-       return ja_pigeon_node_get_nth(type, node, NULL, i);
+       return ja_pigeon_node_get_nth(type, node, NULL, NULL, i);
 }
 
 /*
@@ -439,6 +396,7 @@ struct cds_ja_inode_flag *ja_pigeon_node_get_ith_pos(const struct cds_ja_type *t
 static
 struct cds_ja_inode_flag * ja_node_get_nth(struct cds_ja_inode_flag *node_flag,
                struct cds_ja_inode_flag ***child_node_flag_ptr,
+               struct cds_ja_inode_flag ***node_flag_ptr,
                uint8_t n)
 {
        unsigned int type_index;
@@ -453,13 +411,13 @@ struct cds_ja_inode_flag * ja_node_get_nth(struct cds_ja_inode_flag *node_flag,
        switch (type->type_class) {
        case RCU_JA_LINEAR:
                return ja_linear_node_get_nth(type, node,
-                               child_node_flag_ptr, n);
+                               child_node_flag_ptr, node_flag_ptr, n);
        case RCU_JA_POOL:
                return ja_pool_node_get_nth(type, node,
-                               child_node_flag_ptr, n);
+                               child_node_flag_ptr, node_flag_ptr, n);
        case RCU_JA_PIGEON:
                return ja_pigeon_node_get_nth(type, node,
-                               child_node_flag_ptr, n);
+                               child_node_flag_ptr, node_flag_ptr, n);
        default:
                assert(0);
                return (void *) -1UL;
@@ -653,6 +611,7 @@ int ja_pigeon_node_clear_ptr(const struct cds_ja_type *type,
                struct cds_ja_inode_flag **node_flag_ptr)
 {
        assert(type->type_class == RCU_JA_PIGEON);
+       dbg_printf("ja_pigeon_node_clear_ptr: clearing ptr: %p\n", *node_flag_ptr);
        rcu_assign_pointer(*node_flag_ptr, NULL);
        shadow_node->nr_child--;
        return 0;
@@ -699,7 +658,7 @@ int ja_node_recompact(enum ja_recompact mode,
                const struct cds_ja_type *old_type,
                struct cds_ja_inode *old_node,
                struct cds_ja_shadow_node *shadow_node,
-               struct cds_ja_inode_flag **old_node_flag, uint8_t n,
+               struct cds_ja_inode_flag **old_node_flag_ptr, uint8_t n,
                struct cds_ja_inode_flag *child_node_flag,
                struct cds_ja_inode_flag **nullify_node_flag_ptr)
 {
@@ -707,10 +666,12 @@ int ja_node_recompact(enum ja_recompact mode,
        struct cds_ja_inode *new_node;
        struct cds_ja_shadow_node *new_shadow_node = NULL;
        const struct cds_ja_type *new_type;
-       struct cds_ja_inode_flag *new_node_flag;
+       struct cds_ja_inode_flag *new_node_flag, *old_node_flag;
        int ret;
        int fallback = 0;
 
+       old_node_flag = *old_node_flag_ptr;
+
        switch (mode) {
        case JA_RECOMPACT:
                new_type_index = old_type_index;
@@ -743,7 +704,7 @@ retry:              /* for fallback */
                        return -ENOMEM;
                new_node_flag = ja_node_flag(new_node, new_type_index);
                dbg_printf("Recompact inherit lock from %p\n", shadow_node);
-               new_shadow_node = rcuja_shadow_set(ja->ht, new_node, shadow_node);
+               new_shadow_node = rcuja_shadow_set(ja->ht, new_node_flag, shadow_node, ja);
                if (!new_shadow_node) {
                        free(new_node);
                        return -ENOMEM;
@@ -856,15 +817,15 @@ retry:            /* for fallback */
        }
 skip_copy:
 
-       if (JA_RECOMPACT_ADD) {
+       if (mode == JA_RECOMPACT_ADD) {
                /* add node */
                ret = _ja_node_set_nth(new_type, new_node,
                                new_shadow_node,
                                n, child_node_flag);
                assert(!ret);
        }
-       /* Return pointer to new recompacted node through old_node_flag */
-       *old_node_flag = new_node_flag;
+       /* Return pointer to new recompacted node through old_node_flag_ptr */
+       *old_node_flag_ptr = new_node_flag;
        if (old_node) {
                int flags;
 
@@ -876,7 +837,7 @@ skip_copy:
                 */
                if (new_type_index == NODE_INDEX_NULL)
                        flags = RCUJA_SHADOW_CLEAR_FREE_LOCK;
-               ret = rcuja_shadow_clear(ja->ht, old_node, shadow_node,
+               ret = rcuja_shadow_clear(ja->ht, old_node_flag, shadow_node,
                                flags);
                assert(!ret);
        }
@@ -888,7 +849,7 @@ end:
 fallback_toosmall:
        /* fallback if next pool is too small */
        assert(new_shadow_node);
-       ret = rcuja_shadow_clear(ja->ht, new_node, new_shadow_node,
+       ret = rcuja_shadow_clear(ja->ht, new_node_flag, new_shadow_node,
                        RCUJA_SHADOW_CLEAR_FREE_NODE);
        assert(!ret);
 
@@ -989,7 +950,7 @@ struct cds_hlist_head cds_ja_lookup(struct cds_ja *ja, uint64_t key)
                uint8_t iter_key;
 
                iter_key = (uint8_t) (key >> (JA_BITS_PER_BYTE * (tree_depth - i - 1)));
-               node_flag = ja_node_get_nth(node_flag, NULL,
+               node_flag = ja_node_get_nth(node_flag, NULL, NULL,
                        iter_key);
                dbg_printf("cds_ja_lookup iter key lookup %u finds node_flag %p\n",
                                (unsigned int) iter_key, node_flag);
@@ -1016,6 +977,7 @@ struct cds_hlist_head cds_ja_lookup(struct cds_ja *ja, uint64_t key)
  */
 static
 int ja_attach_node(struct cds_ja *ja,
+               struct cds_ja_inode_flag **attach_node_flag_ptr,
                struct cds_ja_inode_flag **node_flag_ptr,
                struct cds_ja_inode_flag *node_flag,
                struct cds_ja_inode_flag *parent_node_flag,
@@ -1037,20 +999,30 @@ int ja_attach_node(struct cds_ja *ja,
                level, node, node_flag);
 
        assert(node);
-       shadow_node = rcuja_shadow_lookup_lock(ja->ht, node);
+       shadow_node = rcuja_shadow_lookup_lock(ja->ht, node_flag);
        if (!shadow_node) {
                ret = -EAGAIN;
                goto end;
        }
        if (parent_node) {
                parent_shadow_node = rcuja_shadow_lookup_lock(ja->ht,
-                                               parent_node);
+                                               parent_node_flag);
                if (!parent_shadow_node) {
                        ret = -EAGAIN;
                        goto unlock_shadow;
                }
        }
 
+       if (node_flag_ptr && ja_node_ptr(*node_flag_ptr)) {
+               /*
+                * Target node is non-NULL: it has been updated between
+                * RCU lookup and lock acquisition. We need to re-try
+                * lookup and attach.
+                */
+               ret = -EAGAIN;
+               goto unlock_parent;
+       }
+
        /* Create new branch, starting from bottom */
        CDS_INIT_HLIST_HEAD(&head);
        cds_hlist_add_head_rcu(&child_node->list, &head);
@@ -1091,8 +1063,8 @@ int ja_attach_node(struct cds_ja *ja,
 
        /* Publish new branch */
        dbg_printf("Publish branch %p, replacing %p\n",
-               iter_node_flag, *node_flag_ptr);
-       rcu_assign_pointer(*node_flag_ptr, iter_node_flag);
+               iter_node_flag, *attach_node_flag_ptr);
+       rcu_assign_pointer(*attach_node_flag_ptr, iter_node_flag);
 
        /* Success */
        ret = 0;
@@ -1107,12 +1079,13 @@ check_error:
                        if (i)
                                flags |= RCUJA_SHADOW_CLEAR_FREE_NODE;
                        tmpret = rcuja_shadow_clear(ja->ht,
-                                       ja_node_ptr(created_nodes[i]),
+                                       created_nodes[i],
                                        NULL,
                                        flags);
                        assert(!tmpret);
                }
        }
+unlock_parent:
        if (parent_shadow_node)
                rcuja_shadow_unlock(parent_shadow_node);
 unlock_shadow:
@@ -1131,32 +1104,41 @@ end:
 static
 int ja_chain_node(struct cds_ja *ja,
                struct cds_ja_inode_flag *parent_node_flag,
+               struct cds_ja_inode_flag **node_flag_ptr,
                struct cds_hlist_head *head,
                struct cds_ja_node *node)
 {
        struct cds_ja_shadow_node *shadow_node;
+       int ret = 0;
 
-       shadow_node = rcuja_shadow_lookup_lock(ja->ht,
-               ja_node_ptr(parent_node_flag));
-       if (!shadow_node)
+       shadow_node = rcuja_shadow_lookup_lock(ja->ht, parent_node_flag);
+       if (!shadow_node) {
                return -EAGAIN;
+       }
+       if (!ja_node_ptr(*node_flag_ptr)) {
+               ret = -EAGAIN;
+               goto end;
+       }
        cds_hlist_add_head_rcu(&node->list, head);
+end:
        rcuja_shadow_unlock(shadow_node);
-       return 0;
+       return ret;
 }
 
 int cds_ja_add(struct cds_ja *ja, uint64_t key,
                struct cds_ja_node *new_node)
 {
        unsigned int tree_depth, i;
-       struct cds_ja_inode_flag **node_flag_ptr;       /* in parent */
+       struct cds_ja_inode_flag **attach_node_flag_ptr,
+               **node_flag_ptr;
        struct cds_ja_inode_flag *node_flag,
                *parent_node_flag,
                *parent2_node_flag;
        int ret;
 
-       if (caa_unlikely(key > ja->key_max))
+       if (caa_unlikely(key > ja->key_max)) {
                return -EINVAL;
+       }
        tree_depth = ja->tree_depth;
 
 retry:
@@ -1165,6 +1147,7 @@ retry:
        parent2_node_flag = NULL;
        parent_node_flag =
                (struct cds_ja_inode_flag *) &ja->root; /* Use root ptr address as key for mutex */
+       attach_node_flag_ptr = &ja->root;
        node_flag_ptr = &ja->root;
        node_flag = rcu_dereference(ja->root);
 
@@ -1172,10 +1155,11 @@ retry:
        for (i = 1; i < tree_depth; i++) {
                uint8_t iter_key;
 
-               dbg_printf("cds_ja_add iter node_flag_ptr %p node_flag %p\n",
-                               *node_flag_ptr, node_flag);
+               dbg_printf("cds_ja_add iter attach_node_flag_ptr %p node_flag_ptr %p node_flag %p\n",
+                               *attach_node_flag_ptr, *node_flag_ptr, node_flag);
                if (!ja_node_ptr(node_flag)) {
-                       ret = ja_attach_node(ja, node_flag_ptr,
+                       ret = ja_attach_node(ja, attach_node_flag_ptr,
+                                       node_flag_ptr,
                                        parent_node_flag, parent2_node_flag,
                                        key, i, new_node);
                        if (ret == -EAGAIN || ret == -EEXIST)
@@ -1187,10 +1171,13 @@ retry:
                parent2_node_flag = parent_node_flag;
                parent_node_flag = node_flag;
                node_flag = ja_node_get_nth(node_flag,
+                       &attach_node_flag_ptr,
                        &node_flag_ptr,
                        iter_key);
-               dbg_printf("cds_ja_add iter key lookup %u finds node_flag %p node_flag_ptr %p\n",
-                               (unsigned int) iter_key, node_flag, *node_flag_ptr);
+               dbg_printf("cds_ja_add iter key lookup %u finds node_flag %p attach_node_flag_ptr %p node_flag_ptr %p\n",
+                               (unsigned int) iter_key, node_flag,
+                               *attach_node_flag_ptr,
+                               *node_flag_ptr);
        }
 
        /*
@@ -1198,17 +1185,19 @@ retry:
         * level, or chain it if key is already present.
         */
        if (!ja_node_ptr(node_flag)) {
-               dbg_printf("cds_ja_add last node_flag_ptr %p node_flag %p\n",
-                               *node_flag_ptr, node_flag);
-               ret = ja_attach_node(ja, node_flag_ptr, parent_node_flag,
+               dbg_printf("cds_ja_add last attach_node_flag_ptr %p node_flag_ptr %p node_flag %p\n",
+                               *attach_node_flag_ptr, *node_flag_ptr, node_flag);
+               ret = ja_attach_node(ja, attach_node_flag_ptr,
+                               node_flag_ptr, parent_node_flag,
                                parent2_node_flag, key, i, new_node);
        } else {
                ret = ja_chain_node(ja,
                        parent_node_flag,
-                       (struct cds_hlist_head *) node_flag_ptr,
+                       node_flag_ptr,
+                       (struct cds_hlist_head *) attach_node_flag_ptr,
                        new_node);
        }
-       if (ret == -EAGAIN)
+       if (ret == -EAGAIN || ret == -EEXIST)
                goto retry;
 end:
        return ret;
@@ -1221,6 +1210,9 @@ end:
  * ensure that when a match value -> pointer is found in a node, it is
  * _NEVER_ changed for that node without recompaction, and recompaction
  * reallocates the node.
+ * However, when a child is removed from "linear" nodes, its pointer
+ * is set to NULL. We therefore check, while holding the locks, if this
+ * pointer is NULL, and return -ENOENT to the caller if it is the case.
  */
 static
 int ja_detach_node(struct cds_ja *ja,
@@ -1236,10 +1228,10 @@ int ja_detach_node(struct cds_ja *ja,
                        *parent_node_flag = NULL,
                        **parent_node_flag_ptr = NULL;
        struct cds_ja_inode_flag *iter_node_flag;
-       int ret, i, nr_shadow = 0, nr_clear = 0;
-       uint8_t n;
+       int ret, i, nr_shadow = 0, nr_clear = 0, nr_branch = 0;
+       uint8_t n = 0;
 
-       assert(nr_snapshot == ja->tree_depth - 1);
+       assert(nr_snapshot == ja->tree_depth + 1);
 
        /*
         * From the last internal level node going up, get the node
@@ -1248,48 +1240,40 @@ int ja_detach_node(struct cds_ja *ja,
         * which has more that one child left, we lock the parent, and
         * proceed to the node deletion (removing its children too).
         */
-       for (i = nr_snapshot - 1; i >= 1; i--) {
+       for (i = nr_snapshot - 2; i >= 1; i--) {
                struct cds_ja_shadow_node *shadow_node;
 
                shadow_node = rcuja_shadow_lookup_lock(ja->ht,
-                                       ja_node_ptr(snapshot[i]));
+                                       snapshot[i]);
                if (!shadow_node) {
                        ret = -EAGAIN;
                        goto end;
                }
                assert(shadow_node->nr_child > 0);
                shadow_nodes[nr_shadow++] = shadow_node;
-               nr_clear++;
-               if (i == nr_snapshot - 1) {
-                       /*
-                        * Re-check that last internal node level has
-                        * only one child, else trigger a retry.
-                        */
-                       if (shadow_node->nr_child != 1) {
-                               ret = -EAGAIN;
-                               goto end;
-                       }
-               }
+               if (shadow_node->nr_child == 1 && i > 1)
+                       nr_clear++;
+               nr_branch++;
                if (shadow_node->nr_child > 1 || i == 1) {
                        /* Lock parent and break */
                        shadow_node = rcuja_shadow_lookup_lock(ja->ht,
-                                       ja_node_ptr(snapshot[i - 1]));
+                                       snapshot[i - 1]);
                        if (!shadow_node) {
                                ret = -EAGAIN;
                                goto end;
                        }
                        shadow_nodes[nr_shadow++] = shadow_node;
-                       node_flag_ptr = snapshot_ptr[i];
-                       n = snapshot_n[i];
-                       parent_node_flag_ptr = snapshot_ptr[i - 1];
-                       parent_node_flag = snapshot[i - 1];
+                       node_flag_ptr = snapshot_ptr[i + 1];
+                       n = snapshot_n[i + 1];
+                       parent_node_flag_ptr = snapshot_ptr[i];
+                       parent_node_flag = snapshot[i];
                        if (i > 1) {
                                /*
                                 * Lock parent's parent, in case we need
                                 * to recompact parent.
                                 */
                                shadow_node = rcuja_shadow_lookup_lock(ja->ht,
-                                               ja_node_ptr(snapshot[i - 2]));
+                                               snapshot[i - 2]);
                                if (!shadow_node) {
                                        ret = -EAGAIN;
                                        goto end;
@@ -1301,16 +1285,26 @@ int ja_detach_node(struct cds_ja *ja,
        }
 
        /*
-        * At this point, we want to delete all nodes in shadow_nodes
-        * (except the last one, which is either the root or the parent
-        * of the upmost node with 1 child). OK to as to free lock here,
-        * because RCU read lock is held, and free only performed in
-        * call_rcu.
+        * Check if node has been removed between RCU lookup and lock
+        * acquisition.
+        */
+       assert(node_flag_ptr);
+       if (!ja_node_ptr(*node_flag_ptr)) {
+               ret = -ENOENT;
+               goto end;
+       }
+
+       /*
+        * At this point, we want to delete all nodes that are about to
+        * be removed from shadow_nodes (except the last one, which is
+        * either the root or the parent of the upmost node with 1
+        * child). OK to as to free lock here, because RCU read lock is
+        * held, and free only performed in call_rcu.
         */
 
        for (i = 0; i < nr_clear; i++) {
                ret = rcuja_shadow_clear(ja->ht,
-                               shadow_nodes[i]->node,
+                               shadow_nodes[i]->node_flag,
                                shadow_nodes[i],
                                RCUJA_SHADOW_CLEAR_FREE_NODE
                                | RCUJA_SHADOW_CLEAR_FREE_LOCK);
@@ -1322,9 +1316,13 @@ int ja_detach_node(struct cds_ja *ja,
        ret = ja_node_clear_ptr(ja,
                node_flag_ptr,          /* Pointer to location to nullify */
                &iter_node_flag,        /* Old new parent ptr in its parent */
-               shadow_nodes[nr_clear], /* of parent */
+               shadow_nodes[nr_branch - 1],    /* of parent */
                n);
+       if (ret)
+               goto end;
 
+       dbg_printf("ja_detach_node: publish %p instead of %p\n",
+               iter_node_flag, *parent_node_flag_ptr);
        /* Update address of parent ptr in its parent */
        rcu_assign_pointer(*parent_node_flag_ptr, iter_node_flag);
 
@@ -1337,20 +1335,30 @@ end:
 static
 int ja_unchain_node(struct cds_ja *ja,
                struct cds_ja_inode_flag *parent_node_flag,
+               struct cds_ja_inode_flag **node_flag_ptr,
+               struct cds_hlist_head *head,
                struct cds_ja_node *node)
 {
        struct cds_ja_shadow_node *shadow_node;
-       int ret = 0;
+       struct cds_hlist_node *hlist_node;
+       int ret = 0, count = 0;
 
-       shadow_node = rcuja_shadow_lookup_lock(ja->ht,
-               ja_node_ptr(parent_node_flag));
+       shadow_node = rcuja_shadow_lookup_lock(ja->ht, parent_node_flag);
        if (!shadow_node)
                return -EAGAIN;
+       if (!ja_node_ptr(*node_flag_ptr)) {
+               ret = -EAGAIN;
+               goto end;
+       }
        /*
         * Retry if another thread removed all but one of duplicates
-        * since check.
+        * since check (this check was performed without lock).
         */
-       if (shadow_node->nr_child == 1) {
+       cds_hlist_for_each_rcu(hlist_node, head, list) {
+               count++;
+       }
+
+       if (count == 1) {
                ret = -EAGAIN;
                goto end;
        }
@@ -1371,8 +1379,9 @@ int cds_ja_del(struct cds_ja *ja, uint64_t key,
        struct cds_ja_inode_flag **snapshot_ptr[JA_MAX_DEPTH];
        uint8_t snapshot_n[JA_MAX_DEPTH];
        struct cds_ja_inode_flag *node_flag;
-       struct cds_ja_inode_flag **prev_node_flag_ptr;
-       int nr_snapshot = 0;
+       struct cds_ja_inode_flag **prev_node_flag_ptr,
+               **node_flag_ptr;
+       int nr_snapshot;
        int ret;
 
        if (caa_unlikely(key > ja->key_max))
@@ -1380,15 +1389,18 @@ int cds_ja_del(struct cds_ja *ja, uint64_t key,
        tree_depth = ja->tree_depth;
 
 retry:
+       nr_snapshot = 0;
        dbg_printf("cds_ja_del attempt: key %" PRIu64 ", node %p\n",
                key, node);
 
        /* snapshot for level 0 is only for shadow node lookup */
-       snapshot_n[nr_snapshot] = 0;
+       snapshot_n[0] = 0;
+       snapshot_n[1] = 0;
        snapshot_ptr[nr_snapshot] = NULL;
        snapshot[nr_snapshot++] = (struct cds_ja_inode_flag *) &ja->root;
        node_flag = rcu_dereference(ja->root);
        prev_node_flag_ptr = &ja->root;
+       node_flag_ptr = &ja->root;
 
        /* Iterate on all internal levels */
        for (i = 1; i < tree_depth; i++) {
@@ -1400,15 +1412,12 @@ retry:
                        return -ENOENT;
                }
                iter_key = (uint8_t) (key >> (JA_BITS_PER_BYTE * (tree_depth - i - 1)));
-               if (nr_snapshot <= 1)
-                       snapshot_n[nr_snapshot] = 0;
-               else
-                       snapshot_n[nr_snapshot - 1] = iter_key;
-
+               snapshot_n[nr_snapshot + 1] = iter_key;
                snapshot_ptr[nr_snapshot] = prev_node_flag_ptr;
                snapshot[nr_snapshot++] = node_flag;
                node_flag = ja_node_get_nth(node_flag,
                        &prev_node_flag_ptr,
+                       &node_flag_ptr,
                        iter_key);
                dbg_printf("cds_ja_del iter key lookup %u finds node_flag %p, prev_node_flag_ptr %p\n",
                                (unsigned int) iter_key, node_flag,
@@ -1420,38 +1429,51 @@ retry:
         * to remove. Fail if we cannot find it.
         */
        if (!ja_node_ptr(node_flag)) {
+               dbg_printf("cds_ja_del: no node found for key %" PRIu64 "\n",
+                               key);
                return -ENOENT;
        } else {
-               struct cds_hlist_head *hlist_head;
+               struct cds_hlist_head hlist_head;
                struct cds_hlist_node *hlist_node;
                struct cds_ja_node *entry, *match = NULL;
                int count = 0;
 
-               hlist_head = (struct cds_hlist_head *) ja_node_ptr(node_flag);
+               hlist_head.next =
+                       (struct cds_hlist_node *) ja_node_ptr(node_flag);
                cds_hlist_for_each_entry_rcu(entry,
                                hlist_node,
-                               hlist_head,
+                               &hlist_head,
                                list) {
+                       dbg_printf("cds_ja_del: compare %p with entry %p\n", node, entry);
                        if (entry == node)
                                match = entry;
                        count++;
                }
-               if (!match)
+               if (!match) {
+                       dbg_printf("cds_ja_del: no node match for node %p key %" PRIu64 "\n", node, key);
                        return -ENOENT;
+               }
                assert(count > 0);
                if (count == 1) {
                        /*
-                        * Removing last of duplicates.
+                        * Removing last of duplicates. Last snapshot
+                        * does not have a shadow node (external leafs).
                         */
                        snapshot_ptr[nr_snapshot] = prev_node_flag_ptr;
                        snapshot[nr_snapshot++] = node_flag;
                        ret = ja_detach_node(ja, snapshot, snapshot_ptr,
                                        snapshot_n, nr_snapshot, key, node);
                } else {
-                       ret = ja_unchain_node(ja, node_flag, entry);
+                       ret = ja_unchain_node(ja, snapshot[nr_snapshot - 1],
+                               node_flag_ptr, &hlist_head, match);
                }
        }
-       if (ret == -EAGAIN)
+       /*
+        * Explanation of -ENOENT handling: caused by concurrent delete
+        * between RCU lookup and actual removal. Need to re-do the
+        * lookup and removal attempt.
+        */
+       if (ret == -EAGAIN || ret == -ENOENT)
                goto retry;
        return ret;
 }
@@ -1469,13 +1491,13 @@ struct cds_ja *_cds_ja_new(unsigned int key_bits,
 
        switch (key_bits) {
        case 8:
-               ja->key_max = UINT8_MAX;
-               break;
        case 16:
-               ja->key_max = UINT16_MAX;
-               break;
+       case 24:
        case 32:
-               ja->key_max = UINT32_MAX;
+       case 40:
+       case 48:
+       case 56:
+               ja->key_max = (1ULL << key_bits) - 1;
                break;
        case 64:
                ja->key_max = UINT64_MAX;
@@ -1496,13 +1518,13 @@ struct cds_ja *_cds_ja_new(unsigned int key_bits,
         * Note: we should not free this node until judy array destroy.
         */
        root_shadow_node = rcuja_shadow_set(ja->ht,
-                       ja_node_ptr((struct cds_ja_inode_flag *) &ja->root),
-                       NULL);
+                       (struct cds_ja_inode_flag *) &ja->root,
+                       NULL, ja);
        if (!root_shadow_node) {
                ret = -ENOMEM;
                goto ht_node_error;
        }
-       root_shadow_node->is_root = 1;
+       root_shadow_node->level = 0;
 
        return ja;
 
@@ -1516,16 +1538,119 @@ ja_error:
        return NULL;
 }
 
+/*
+ * Called from RCU read-side CS.
+ */
+__attribute__((visibility("protected")))
+void rcuja_free_all_children(struct cds_ja_shadow_node *shadow_node,
+               struct cds_ja_inode_flag *node_flag,
+               void (*free_node_cb)(struct rcu_head *head))
+{
+       const struct rcu_flavor_struct *flavor;
+       unsigned int type_index;
+       struct cds_ja_inode *node;
+       const struct cds_ja_type *type;
+
+       flavor = cds_lfht_rcu_flavor(shadow_node->ja->ht);
+       node = ja_node_ptr(node_flag);
+       assert(node != NULL);
+       type_index = ja_node_type(node_flag);
+       type = &ja_types[type_index];
+
+       switch (type->type_class) {
+       case RCU_JA_LINEAR:
+       {
+               uint8_t nr_child =
+                       ja_linear_node_get_nr_child(type, node);
+               unsigned int i;
+
+               for (i = 0; i < nr_child; i++) {
+                       struct cds_ja_inode_flag *iter;
+                       struct cds_hlist_head head;
+                       struct cds_ja_node *entry;
+                       struct cds_hlist_node *pos;
+                       uint8_t v;
+
+                       ja_linear_node_get_ith_pos(type, node, i, &v, &iter);
+                       if (!iter)
+                               continue;
+                       head.next = (struct cds_hlist_node *) iter;
+                       cds_hlist_for_each_entry_rcu(entry, pos, &head, list) {
+                               flavor->update_call_rcu(&entry->head, free_node_cb);
+                       }
+               }
+               break;
+       }
+       case RCU_JA_POOL:
+       {
+               unsigned int pool_nr;
+
+               for (pool_nr = 0; pool_nr < (1U << type->nr_pool_order); pool_nr++) {
+                       struct cds_ja_inode *pool =
+                               ja_pool_node_get_ith_pool(type, node, pool_nr);
+                       uint8_t nr_child =
+                               ja_linear_node_get_nr_child(type, pool);
+                       unsigned int j;
+
+                       for (j = 0; j < nr_child; j++) {
+                               struct cds_ja_inode_flag *iter;
+                               struct cds_hlist_head head;
+                               struct cds_ja_node *entry;
+                               struct cds_hlist_node *pos;
+                               uint8_t v;
+
+                               ja_linear_node_get_ith_pos(type, node, j, &v, &iter);
+                               if (!iter)
+                                       continue;
+                               head.next = (struct cds_hlist_node *) iter;
+                               cds_hlist_for_each_entry_rcu(entry, pos, &head, list) {
+                                       flavor->update_call_rcu(&entry->head, free_node_cb);
+                               }
+                       }
+               }
+               break;
+       }
+       case RCU_JA_NULL:
+               break;
+       case RCU_JA_PIGEON:
+       {
+               uint8_t nr_child;
+               unsigned int i;
+
+               nr_child = shadow_node->nr_child;
+               for (i = 0; i < nr_child; i++) {
+                       struct cds_ja_inode_flag *iter;
+                       struct cds_hlist_head head;
+                       struct cds_ja_node *entry;
+                       struct cds_hlist_node *pos;
+
+                       iter = ja_pigeon_node_get_ith_pos(type, node, i);
+                       if (!iter)
+                               continue;
+                       head.next = (struct cds_hlist_node *) iter;
+                       cds_hlist_for_each_entry_rcu(entry, pos, &head, list) {
+                               flavor->update_call_rcu(&entry->head, free_node_cb);
+                       }
+               }
+               break;
+       }
+       default:
+               assert(0);
+       }
+}
+
 /*
  * There should be no more concurrent add to the judy array while it is
  * being destroyed (ensured by the caller).
  */
-int cds_ja_destroy(struct cds_ja *ja)
+int cds_ja_destroy(struct cds_ja *ja,
+               void (*free_node_cb)(struct rcu_head *head))
 {
        int ret;
 
        rcuja_shadow_prune(ja->ht,
-               RCUJA_SHADOW_CLEAR_FREE_NODE | RCUJA_SHADOW_CLEAR_FREE_LOCK);
+               RCUJA_SHADOW_CLEAR_FREE_NODE | RCUJA_SHADOW_CLEAR_FREE_LOCK,
+               free_node_cb);
        ret = rcuja_delete_ht(ja->ht);
        if (ret)
                return ret;
This page took 0.033759 seconds and 4 git commands to generate.