#include <stdint.h>
#include "rcuja-internal.h"
-#include "bitfield.h"
#ifndef abs
#define abs_int(a) ((int) (a) > 0 ? (int) (a) : -((int) (a)))
return ptr;
}
+static
+struct cds_ja_inode_flag *ja_linear_node_get_left(const struct cds_ja_type *type,
+ struct cds_ja_inode *node,
+ unsigned int n)
+{
+ uint8_t nr_child;
+ uint8_t *values;
+ struct cds_ja_inode_flag **pointers;
+ struct cds_ja_inode_flag *ptr;
+ unsigned int i, match_idx;
+ int match_v = -1;
+
+ assert(type->type_class == RCU_JA_LINEAR || type->type_class == RCU_JA_POOL);
+
+ nr_child = ja_linear_node_get_nr_child(type, node);
+ cmm_smp_rmb(); /* read nr_child before values and pointers */
+ assert(nr_child <= type->max_linear_child);
+ assert(type->type_class != RCU_JA_LINEAR || nr_child >= type->min_child);
+
+ values = &node->u.data[1];
+ for (i = 0; i < nr_child; i++) {
+ unsigned int v;
+
+ v = CMM_LOAD_SHARED(values[i]);
+ if (v < n && (int) v > match_v) {
+ match_v = v;
+ match_idx = i;
+ }
+ }
+ if (match_v < 0) {
+ return NULL;
+ }
+ pointers = (struct cds_ja_inode_flag **) align_ptr_size(&values[type->max_linear_child]);
+ ptr = rcu_dereference(pointers[match_idx]);
+ return ptr;
+}
+
static
void ja_linear_node_get_ith_pos(const struct cds_ja_type *type,
struct cds_ja_inode *node,
&node->u.data[(unsigned int) i << type->pool_size_order];
}
+static
+struct cds_ja_inode_flag *ja_pool_node_get_left(const struct cds_ja_type *type,
+ struct cds_ja_inode *node,
+ unsigned int n)
+{
+ unsigned int pool_nr;
+ int match_v = -1;
+ struct cds_ja_inode_flag *match_node_flag = NULL;
+
+ assert(type->type_class == RCU_JA_POOL);
+
+ for (pool_nr = 0; pool_nr < (1U << type->nr_pool_order); pool_nr++) {
+ struct cds_ja_inode *pool =
+ ja_pool_node_get_ith_pool(type,
+ node, pool_nr);
+ uint8_t nr_child =
+ ja_linear_node_get_nr_child(type, pool);
+ unsigned int j;
+
+ for (j = 0; j < nr_child; j++) {
+ struct cds_ja_inode_flag *iter;
+ uint8_t v;
+
+ ja_linear_node_get_ith_pos(type, pool,
+ j, &v, &iter);
+ if (!iter)
+ continue;
+ if (v < n && (int) v > match_v) {
+ match_v = v;
+ match_node_flag = iter;
+ }
+ }
+ }
+ return match_node_flag;
+}
+
static
struct cds_ja_inode_flag *ja_pigeon_node_get_nth(const struct cds_ja_type *type,
struct cds_ja_inode *node,
return child_node_flag;
}
+static
+struct cds_ja_inode_flag *ja_pigeon_node_get_left(const struct cds_ja_type *type,
+ struct cds_ja_inode *node,
+ unsigned int n)
+{
+ struct cds_ja_inode_flag **child_node_flag_ptr;
+ struct cds_ja_inode_flag *child_node_flag;
+ int i;
+
+ assert(type->type_class == RCU_JA_PIGEON);
+
+ /* n - 1 is first value left of n */
+ for (i = n - 1; i >= 0; i--) {
+ child_node_flag_ptr = &((struct cds_ja_inode_flag **) node->u.data)[i];
+ child_node_flag = rcu_dereference(*child_node_flag_ptr);
+ if (child_node_flag) {
+ dbg_printf("ja_pigeon_node_get_left child_node_flag %p\n",
+ child_node_flag);
+ return child_node_flag;
+ }
+ }
+ return NULL;
+}
+
static
struct cds_ja_inode_flag *ja_pigeon_node_get_ith_pos(const struct cds_ja_type *type,
struct cds_ja_inode *node,
}
}
+static
+struct cds_ja_inode_flag *ja_node_get_left(struct cds_ja_inode_flag *node_flag,
+ unsigned int n)
+{
+ unsigned int type_index;
+ struct cds_ja_inode *node;
+ const struct cds_ja_type *type;
+
+ node = ja_node_ptr(node_flag);
+ assert(node != NULL);
+ type_index = ja_node_type(node_flag);
+ type = &ja_types[type_index];
+
+ switch (type->type_class) {
+ case RCU_JA_LINEAR:
+ return ja_linear_node_get_left(type, node, n);
+ case RCU_JA_POOL:
+ return ja_pool_node_get_left(type, node, n);
+ case RCU_JA_PIGEON:
+ return ja_pigeon_node_get_left(type, node, n);
+ default:
+ assert(0);
+ return (void *) -1UL;
+ }
+}
+
+static
+struct cds_ja_inode_flag *ja_node_get_rightmost(struct cds_ja_inode_flag *node_flag)
+{
+ return ja_node_get_left(node_flag, JA_ENTRY_PER_NODE);
+}
+
static
int ja_linear_node_set_nth(const struct cds_ja_type *type,
struct cds_ja_inode *node,
return ret;
}
-struct cds_hlist_head cds_ja_lookup(struct cds_ja *ja, uint64_t key)
+struct cds_ja_node *cds_ja_lookup(struct cds_ja *ja, uint64_t key)
{
unsigned int tree_depth, i;
struct cds_ja_inode_flag *node_flag;
- struct cds_hlist_head head = { NULL };
if (caa_unlikely(key > ja->key_max))
- return head;
+ return NULL;
tree_depth = ja->tree_depth;
node_flag = rcu_dereference(ja->root);
/* level 0: root node */
if (!ja_node_ptr(node_flag))
- return head;
+ return NULL;
for (i = 1; i < tree_depth; i++) {
uint8_t iter_key;
dbg_printf("cds_ja_lookup iter key lookup %u finds node_flag %p\n",
(unsigned int) iter_key, node_flag);
if (!ja_node_ptr(node_flag))
- return head;
+ return NULL;
}
/* Last level lookup succeded. We got an actual match. */
- head.next = (struct cds_hlist_node *) node_flag;
- return head;
+ return (struct cds_ja_node *) node_flag;
+}
+
+struct cds_ja_node *cds_ja_lookup_lower_equal(struct cds_ja *ja, uint64_t key)
+{
+ int tree_depth, level;
+ struct cds_ja_inode_flag *node_flag, *cur_node_depth[JA_MAX_DEPTH];
+
+ if (caa_unlikely(key > ja->key_max || !key))
+ return NULL;
+
+ memset(cur_node_depth, 0, sizeof(cur_node_depth));
+ tree_depth = ja->tree_depth;
+ node_flag = rcu_dereference(ja->root);
+ cur_node_depth[0] = node_flag;
+
+ /* level 0: root node */
+ if (!ja_node_ptr(node_flag))
+ return NULL;
+
+ for (level = 1; level < tree_depth; level++) {
+ uint8_t iter_key;
+
+ iter_key = (uint8_t) (key >> (JA_BITS_PER_BYTE * (tree_depth - level - 1)));
+ node_flag = ja_node_get_nth(node_flag, NULL, iter_key);
+ if (!ja_node_ptr(node_flag))
+ break;
+ cur_node_depth[level] = node_flag;
+ dbg_printf("cds_ja_lookup iter key lookup %u finds node_flag %p\n",
+ (unsigned int) iter_key, node_flag);
+ }
+
+ if (level == tree_depth) {
+ /* Last level lookup succeded. We got an equal match. */
+ return (struct cds_ja_node *) node_flag;
+ }
+
+ /*
+ * Find highest value left of current node.
+ * Current node is cur_node_depth[level].
+ * Start at current level. If we cannot find any key left of
+ * ours, go one level up, seek highest value left of current
+ * (recursively), and when we find one, get the rightmost child
+ * of its rightmost child (recursively).
+ */
+ for (; level > 0; level--) {
+ uint8_t iter_key;
+
+ iter_key = (uint8_t) (key >> (JA_BITS_PER_BYTE * (tree_depth - level - 1)));
+ node_flag = ja_node_get_left(cur_node_depth[level - 1],
+ iter_key);
+ /* If found left sibling, find rightmost child. */
+ if (ja_node_ptr(node_flag))
+ break;
+ }
+
+ if (!level) {
+ /* Reached the root and could not find a left sibling. */
+ return NULL;
+ }
+
+ level++;
+
+ /*
+ * From this point, we are guaranteed to be able to find a
+ * "lower than" match. ja_attach_node() and ja_detach_node()
+ * both guarantee that it is not possible for a lookup to reach
+ * a dead-end.
+ */
+
+ /* Find rightmost child of rightmost child (recursively). */
+ for (; level < tree_depth; level++) {
+ node_flag = ja_node_get_rightmost(node_flag);
+ /* If found left sibling, find rightmost child. */
+ if (!ja_node_ptr(node_flag))
+ break;
+ }
+
+ assert(level == tree_depth);
+
+ return (struct cds_ja_node *) node_flag;
}
/*
* parent lock (if needed). Then we can proceed to create the new
* branch. Publish the new branch, and release locks.
* TODO: we currently always take the parent lock even when not needed.
+ *
+ * ja_attach_node() ensures that a lookup will _never_ see a branch that
+ * leads to a dead-end: before attaching a branch, the entire content of
+ * the new branch is populated, thus creating a cluster, before
+ * attaching the cluster to the rest of the tree, thus making it visible
+ * to lookups.
*/
static
int ja_attach_node(struct cds_ja *ja,
{
struct cds_ja_shadow_node *shadow_node = NULL,
*parent_shadow_node = NULL;
- struct cds_hlist_head head;
struct cds_ja_inode_flag *iter_node_flag, *iter_dest_node_flag;
int ret, i;
struct cds_ja_inode_flag *created_nodes[JA_MAX_DEPTH];
}
/* Create new branch, starting from bottom */
- CDS_INIT_HLIST_HEAD(&head);
- cds_hlist_add_head_rcu(&child_node->list, &head);
- iter_node_flag = (struct cds_ja_inode_flag *) head.next;
+ iter_node_flag = (struct cds_ja_inode_flag *) child_node;
for (i = ja->tree_depth - 1; i >= (int) level; i--) {
uint8_t iter_key;
}
/*
- * Lock the parent containing the hlist head pointer, and add node to list of
- * duplicates. Failure can happen if concurrent update changes the
- * parent before we get the lock. We return -EAGAIN in that case.
+ * Lock the parent containing the pointer to list of duplicates, and add
+ * node to this list. Failure can happen if concurrent update changes
+ * the parent before we get the lock. We return -EAGAIN in that case.
* Return 0 on success, negative error value on failure.
*/
static
ret = -EAGAIN;
goto end;
}
- cds_hlist_add_head_rcu(&node->list, (struct cds_hlist_head *) node_flag_ptr);
+ /*
+ * Add node to head of list. Safe against concurrent RCU read
+ * traversals.
+ */
+ node->next = (struct cds_ja_node *) node_flag;
+ rcu_assign_pointer(*node_flag_ptr, (struct cds_ja_inode_flag *) node);
end:
rcuja_shadow_unlock(shadow_node);
return ret;
* However, when a child is removed from "linear" nodes, its pointer
* is set to NULL. We therefore check, while holding the locks, if this
* pointer is NULL, and return -ENOENT to the caller if it is the case.
+ *
+ * ja_detach_node() ensures that a lookup will _never_ see a branch that
+ * leads to a dead-end: when removing branch, it makes sure to perform
+ * the "cut" at the highest node that has only one child, effectively
+ * replacing it with a NULL pointer.
*/
static
int ja_detach_node(struct cds_ja *ja,
struct cds_ja_node *node)
{
struct cds_ja_shadow_node *shadow_node;
- struct cds_hlist_node *hlist_node;
- struct cds_hlist_head hlist_head;
+ struct cds_ja_node *iter_node, **iter_node_ptr, **prev_node_ptr = NULL;
int ret = 0, count = 0, found = 0;
shadow_node = rcuja_shadow_lookup_lock(ja->ht, parent_node_flag);
ret = -EAGAIN;
goto end;
}
- hlist_head.next = (struct cds_hlist_node *) ja_node_ptr(node_flag);
/*
- * Retry if another thread removed all but one of duplicates
- * since check (this check was performed without lock).
- * Ensure that the node we are about to remove is still in the
- * list (while holding lock).
+ * Find the previous node's next pointer pointing to our node,
+ * so we can update it. Retry if another thread removed all but
+ * one of duplicates since check (this check was performed
+ * without lock). Ensure that the node we are about to remove is
+ * still in the list (while holding lock). No need for RCU
+ * traversal here since we hold the lock on the parent.
*/
- cds_hlist_for_each_rcu(hlist_node, &hlist_head) {
- if (count == 0) {
- /* FIXME: currently a work-around */
- hlist_node->prev = (struct cds_hlist_node *) node_flag_ptr;
- }
+ iter_node_ptr = (struct cds_ja_node **) node_flag_ptr;
+ iter_node = (struct cds_ja_node *) ja_node_ptr(node_flag);
+ cds_ja_for_each_duplicate(iter_node) {
count++;
- if (hlist_node == &node->list)
+ if (iter_node == node) {
+ prev_node_ptr = iter_node_ptr;
found++;
+ }
+ iter_node_ptr = &iter_node->next;
}
assert(found <= 1);
if (!found || count == 1) {
ret = -EAGAIN;
goto end;
}
- cds_hlist_del_rcu(&node->list);
+ CMM_STORE_SHARED(*prev_node_ptr, node->next);
/*
* Validate that we indeed removed the node from linked list.
*/
key);
return -ENOENT;
} else {
- struct cds_hlist_head hlist_head;
- struct cds_hlist_node *hlist_node;
- struct cds_ja_node *entry, *match = NULL;
+ struct cds_ja_node *iter_node, *match = NULL;
int count = 0;
- hlist_head.next =
- (struct cds_hlist_node *) ja_node_ptr(node_flag);
- cds_hlist_for_each_entry_rcu(entry,
- hlist_node,
- &hlist_head,
- list) {
- dbg_printf("cds_ja_del: compare %p with entry %p\n", node, entry);
- if (entry == node)
- match = entry;
+ iter_node = (struct cds_ja_node *) ja_node_ptr(node_flag);
+ cds_ja_for_each_duplicate_rcu(iter_node) {
+ dbg_printf("cds_ja_del: compare %p with iter_node %p\n", node, iter_node);
+ if (iter_node == node)
+ match = iter_node;
count++;
}
+
if (!match) {
dbg_printf("cds_ja_del: no node match for node %p key %" PRIu64 "\n", node, key);
return -ENOENT;
__attribute__((visibility("protected")))
void rcuja_free_all_children(struct cds_ja_shadow_node *shadow_node,
struct cds_ja_inode_flag *node_flag,
- void (*free_node_cb)(struct rcu_head *head))
+ void (*rcu_free_node)(struct cds_ja_node *node))
{
- const struct rcu_flavor_struct *flavor;
unsigned int type_index;
struct cds_ja_inode *node;
const struct cds_ja_type *type;
- flavor = cds_lfht_rcu_flavor(shadow_node->ja->ht);
node = ja_node_ptr(node_flag);
assert(node != NULL);
type_index = ja_node_type(node_flag);
for (i = 0; i < nr_child; i++) {
struct cds_ja_inode_flag *iter;
- struct cds_hlist_head head;
- struct cds_ja_node *entry;
- struct cds_hlist_node *pos, *tmp;
+ struct cds_ja_node *node_iter, *n;
uint8_t v;
ja_linear_node_get_ith_pos(type, node, i, &v, &iter);
- if (!iter)
- continue;
- head.next = (struct cds_hlist_node *) iter;
- cds_hlist_for_each_entry_safe(entry, pos, tmp, &head, list) {
- flavor->update_call_rcu(&entry->head, free_node_cb);
+ node_iter = (struct cds_ja_node *) iter;
+ cds_ja_for_each_duplicate_safe(node_iter, n) {
+ rcu_free_node(node_iter);
}
}
break;
for (j = 0; j < nr_child; j++) {
struct cds_ja_inode_flag *iter;
- struct cds_hlist_head head;
- struct cds_ja_node *entry;
- struct cds_hlist_node *pos, *tmp;
+ struct cds_ja_node *node_iter, *n;
uint8_t v;
ja_linear_node_get_ith_pos(type, pool, j, &v, &iter);
- if (!iter)
- continue;
- head.next = (struct cds_hlist_node *) iter;
- cds_hlist_for_each_entry_safe(entry, pos, tmp, &head, list) {
- flavor->update_call_rcu(&entry->head, free_node_cb);
+ node_iter = (struct cds_ja_node *) iter;
+ cds_ja_for_each_duplicate_safe(node_iter, n) {
+ rcu_free_node(node_iter);
}
}
}
for (i = 0; i < JA_ENTRY_PER_NODE; i++) {
struct cds_ja_inode_flag *iter;
- struct cds_hlist_head head;
- struct cds_ja_node *entry;
- struct cds_hlist_node *pos, *tmp;
+ struct cds_ja_node *node_iter, *n;
iter = ja_pigeon_node_get_ith_pos(type, node, i);
- if (!iter)
- continue;
- head.next = (struct cds_hlist_node *) iter;
- cds_hlist_for_each_entry_safe(entry, pos, tmp, &head, list) {
- flavor->update_call_rcu(&entry->head, free_node_cb);
+ node_iter = (struct cds_ja_node *) iter;
+ cds_ja_for_each_duplicate_safe(node_iter, n) {
+ rcu_free_node(node_iter);
}
}
break;
}
static
-void print_ja_debug_info(struct cds_ja *ja)
+int ja_final_checks(struct cds_ja *ja)
{
double fallback_ratio;
unsigned long na, nf, nr_fallback;
+ int ret = 0;
fallback_ratio = (double) uatomic_read(&ja->nr_fallback);
fallback_ratio /= (double) uatomic_read(&ja->nr_nodes_allocated);
na = uatomic_read(&ja->nr_nodes_allocated);
nf = uatomic_read(&ja->nr_nodes_freed);
+ dbg_printf("Nodes allocated: %lu, Nodes freed: %lu.\n", na, nf);
+ if (nr_fallback)
+ print_debug_fallback_distribution(ja);
+
if (na != nf) {
fprintf(stderr, "[error] Judy array leaked %ld nodes. Allocated: %lu, freed: %lu.\n",
(long) na - nf, na, nf);
+ ret = -1;
}
- dbg_printf("Nodes allocated: %lu, Nodes freed: %lu.\n", na, nf);
- if (nr_fallback)
- print_debug_fallback_distribution(ja);
+ return ret;
}
/*
* being destroyed (ensured by the caller).
*/
int cds_ja_destroy(struct cds_ja *ja,
- void (*free_node_cb)(struct rcu_head *head))
+ void (*rcu_free_node)(struct cds_ja_node *node))
{
const struct rcu_flavor_struct *flavor;
int ret;
flavor = cds_lfht_rcu_flavor(ja->ht);
rcuja_shadow_prune(ja->ht,
RCUJA_SHADOW_CLEAR_FREE_NODE | RCUJA_SHADOW_CLEAR_FREE_LOCK,
- free_node_cb);
+ rcu_free_node);
flavor->thread_offline();
ret = rcuja_delete_ht(ja->ht);
if (ret)
flavor->barrier();
flavor->thread_online();
- print_ja_debug_info(ja);
+ ret = ja_final_checks(ja);
free(ja);
- return 0;
+ return ret;
}