X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=rcuja%2Frcuja-range.c;h=26a0c2e97b50e8d58987913772c25d93a29cfc1e;hb=refs%2Fheads%2Furcu%2Frcuja-range;hp=faf60ce48feb86335a669d4de17973f2a2e93ad0;hpb=dd1da0cb783e029bc65ebcbc1e2681281d0b6e1f;p=userspace-rcu.git diff --git a/rcuja/rcuja-range.c b/rcuja/rcuja-range.c index faf60ce..26a0c2e 100644 --- a/rcuja/rcuja-range.c +++ b/rcuja/rcuja-range.c @@ -80,6 +80,30 @@ * space. */ +/* + * Discussion: concurrent lookup vs add + * + * When executed concurrently with node add, the inequality + * lookup can see no node for the looked-up range, because a range can + * be shrinked. This can happen if, for instance, we lookup key 2 + * between addition of a "free" range for values [1,2], and removal of + * the old "free" range for values [0,2]. We would then fail to observe + * any range for key 2. Given that the lookup is performed during a + * range transition, we can safely return that there is no allocated + * node in the range. + */ + +/* + * Discussion: concurrent lookup vs del + * + * There is no special case for lookups performed concurrently with node + * del, because node del either replaces the node with the exact same + * start key (see duplicates guarantees), or replaces it with a larger + * range containing the prior range. Therefore, we are sure that + * inequality lookups will see the larger range before the old range is + * deleted, in whichever direction the lookup is performed. + */ + /* * Discussion of the type state transitions. * @@ -92,7 +116,25 @@ * A range type never changes otherwise. */ -#define CDS_JA_RANGE_KEY_BITS 64 +//#define RANGE_DEBUG + +#undef dbg_printf + +#ifdef RANGE_DEBUG +#define dbg_printf(fmt, args...) \ + fprintf(stderr, "[debug rcuja-range %lu %s()@%s:%u] " fmt, \ + (unsigned long) gettid(), __func__, \ + __FILE__, __LINE__, ## args) +#else +#define dbg_printf(fmt, args...) \ +do { \ + /* do nothing but check printf format */ \ + if (0) \ + fprintf(stderr, "[debug rcuja-range %lu %s()@%s:%u] " fmt, \ + (unsigned long) gettid(), __func__, \ + __FILE__, __LINE__, ## args); \ +} while (0) +#endif enum cds_ja_range_type { CDS_JA_RANGE_ALLOCATED, @@ -121,8 +163,10 @@ struct cds_ja_range *cds_ja_range_lookup(struct cds_ja *ja, uint64_t key) struct cds_ja_node *node, *last_node; struct cds_ja_range *range; + dbg_printf("key: %" PRIu64 "\n", key); node = cds_ja_lookup_below_equal(ja, key, NULL); - assert(node); + if (!node) + return NULL; /* * Get the last of duplicate chain. Adding a node to Judy array * duplicates inserts them at the end of the chain. @@ -130,6 +174,11 @@ struct cds_ja_range *cds_ja_range_lookup(struct cds_ja *ja, uint64_t key) cds_ja_for_each_duplicate_rcu(node) last_node = node; range = caa_container_of(last_node, struct cds_ja_range, ja_node); + + /* Check if range is currently hidden by concurrent add */ + if (range->end < key) + return NULL; + /* * If last node in the duplicates is removed or free, we can * consider that either a removal or add operation is in @@ -168,6 +217,14 @@ void cds_ja_range_unlock(struct cds_ja_range *range) pthread_mutex_unlock(&range->lock); } +void cds_ja_range_get_values(const struct cds_ja_range *range, + uint64_t *start, uint64_t *end, void **priv) +{ + *start = range->start; + *end = range->end; + *priv = range->priv; +} + static struct cds_ja_range *range_create( uint64_t start, /* inclusive */ @@ -209,47 +266,50 @@ void rcu_free_range(struct cds_ja *ja, struct cds_ja_range *range) free_range_cb); } -struct cds_ja_range *cds_ja_range_add(struct cds_ja *ja, +int cds_ja_range_add(struct cds_ja *ja, uint64_t start, /* inclusive */ uint64_t end, /* inclusive */ void *priv) { - struct cds_ja_node *old_node, *old_node_end; - struct cds_ja_range *old_range, *old_range_end, *new_range, *ranges[3]; + struct cds_ja_node *old_node; + struct cds_ja_range *old_range, *new_range, *ranges[3]; unsigned int nr_ranges, i; int ret; + if (start > end || end == UINT64_MAX || end > ja->key_max) + return -EINVAL; + retry: + dbg_printf("start: %" PRIu64 ", end: %" PRIu64 ", priv %p\n", + start, end, priv); /* * Find if requested range is entirely contained within a single * free range. */ old_node = cds_ja_lookup_below_equal(ja, start, NULL); - assert(old_node); + /* Range hidden by concurrent add */ + if (!old_node) + goto retry; old_range = caa_container_of(old_node, struct cds_ja_range, ja_node); + + /* Range hidden by concurrent add */ + if (old_range->end < start) + goto retry; + + /* We now know that old_range overlaps with our range */ switch (CMM_LOAD_SHARED(old_range->type)) { case CDS_JA_RANGE_ALLOCATED: - return NULL; + return -EEXIST; case CDS_JA_RANGE_FREE: break; case CDS_JA_RANGE_REMOVED: goto retry; } - old_node_end = cds_ja_lookup_below_equal(ja, end, NULL); - assert(old_node_end); - old_range_end = caa_container_of(old_node_end, - struct cds_ja_range, ja_node); - if (old_range_end != old_range) { - switch (CMM_LOAD_SHARED(old_range->type)) { - case CDS_JA_RANGE_ALLOCATED: - case CDS_JA_RANGE_FREE: /* fall-through */ - return NULL; - case CDS_JA_RANGE_REMOVED: - goto retry; - } - } + /* We do not fit entirely within the range */ + if (old_range->end < end) + return -EEXIST; pthread_mutex_lock(&old_range->lock); @@ -267,6 +327,7 @@ retry: nr_ranges = 1; } else { /* 2 ranges */ + assert(old_range->end > end); ranges[0] = new_range = range_create(start, end, priv, CDS_JA_RANGE_ALLOCATED); ranges[1] = range_create(end + 1, old_range->end, @@ -276,6 +337,7 @@ retry: } else { if (end == old_range->end) { /* 2 ranges */ + assert(old_range->start < start); ranges[0] = range_create(old_range->start, start - 1, NULL, CDS_JA_RANGE_FREE); ranges[1] = new_range = range_create(start, end, @@ -283,6 +345,8 @@ retry: nr_ranges = 2; } else { /* 3 ranges */ + assert(old_range->start < start); + assert(old_range->end > end); ranges[0] = range_create(old_range->start, start - 1, NULL, CDS_JA_RANGE_FREE); ranges[1] = new_range = range_create(start, end, @@ -295,6 +359,11 @@ retry: /* Add replacement ranges to Judy array */ for (i = 0; i < nr_ranges; i++) { + dbg_printf("ADD RANGE: %" PRIu64 "-%" PRIu64 " %s.\n", + ranges[i]->start, ranges[i]->end, + ranges[i]->type == CDS_JA_RANGE_ALLOCATED ? + "allocated" : "free"); + pthread_mutex_lock(&ranges[i]->lock); ret = cds_ja_add(ja, ranges[i]->start, &ranges[i]->ja_node); assert(!ret); } @@ -307,15 +376,23 @@ retry: * concurrently with add followed by del of duplicate keys. */ + dbg_printf("REM RANGE: %" PRIu64 "-%" PRIu64 " %s.\n", + old_range->start, old_range->end, + old_range->type == CDS_JA_RANGE_ALLOCATED ? + "allocated" : "free"); /* Remove old free range */ ret = cds_ja_del(ja, old_range->start, &old_range->ja_node); assert(!ret); old_range->type = CDS_JA_RANGE_REMOVED; pthread_mutex_unlock(&old_range->lock); + for (i = 0; i < nr_ranges; i++) + pthread_mutex_unlock(&ranges[i]->lock); rcu_free_range(ja, old_range); - return new_range; + dbg_printf("\n"); + + return 0; } int cds_ja_range_del(struct cds_ja *ja, struct cds_ja_range *range) @@ -328,14 +405,32 @@ int cds_ja_range_del(struct cds_ja *ja, struct cds_ja_range *range) int ret; retry: + dbg_printf("start: %" PRIu64 ", end %" PRIu64 ", priv: %p\n", + range->start, range->end, range->priv); + nr_merge = 0; nr_lock = 0; - prev_node = cds_ja_lookup_below_equal(ja, range->start - 1, NULL); - if (prev_node) { + + /* + * Range has been concurrently updated. + */ + if (range->type != CDS_JA_RANGE_ALLOCATED) + return -ENOENT; + + if (range->start > 0) { struct cds_ja_range *prev_range; + prev_node = cds_ja_lookup_below_equal(ja, range->start - 1, + NULL); + if (!prev_node) + goto retry; + prev_range = caa_container_of(prev_node, struct cds_ja_range, ja_node); + /* Prev range temporarily hidden due to concurrent add. */ + if (prev_range->end != range->start - 1) + goto retry; + lock_ranges[nr_lock++] = prev_range; if (prev_range->type != CDS_JA_RANGE_ALLOCATED) merge_ranges[nr_merge++] = prev_range; @@ -344,12 +439,20 @@ retry: lock_ranges[nr_lock++] = range; merge_ranges[nr_merge++] = range; - next_node = cds_ja_lookup_above_equal(ja, range->end + 1, NULL); - if (next_node) { + if (range->end < UINT64_MAX - 1) { struct cds_ja_range *next_range; + next_node = cds_ja_lookup_below_equal(ja, range->end + 1, + NULL); + /* Next range temporarily hidden due to concurrent add. */ + if (!next_node) + goto retry; + next_range = caa_container_of(next_node, struct cds_ja_range, ja_node); + if (next_range->start != range->end + 1) + goto retry; + lock_ranges[nr_lock++] = next_range; if (next_range->type != CDS_JA_RANGE_ALLOCATED) merge_ranges[nr_merge++] = next_range; @@ -358,6 +461,10 @@ retry: /* Acquire locks in increasing key order for range merge */ for (i = 0; i < nr_lock; i++) pthread_mutex_lock(&lock_ranges[i]->lock); + if (range->type != CDS_JA_RANGE_ALLOCATED) { + ret = -ENOENT; + goto unlock_error; + } /* Ensure they are valid */ for (i = 0; i < nr_lock; i++) { if (lock_ranges[i]->type == CDS_JA_RANGE_REMOVED) @@ -368,11 +475,23 @@ retry: start = merge_ranges[0]->start; end = merge_ranges[nr_merge - 1]->end; new_range = range_create(start, end, NULL, CDS_JA_RANGE_FREE); + pthread_mutex_lock(&new_range->lock); + + dbg_printf("ADD RANGE: %" PRIu64 "-%" PRIu64 " %s.\n", + new_range->start, new_range->end, + new_range->type == CDS_JA_RANGE_ALLOCATED ? + "allocated" : "free"); + ret = cds_ja_add(ja, start, &new_range->ja_node); assert(!ret); /* Remove old ranges */ for (i = 0; i < nr_merge; i++) { + + dbg_printf("REM RANGE: %" PRIu64 "-%" PRIu64 " %s.\n", + merge_ranges[i]->start, merge_ranges[i]->end, + merge_ranges[i]->type == CDS_JA_RANGE_ALLOCATED ? + "allocated" : "free"); ret = cds_ja_del(ja, merge_ranges[i]->start, &merge_ranges[i]->ja_node); assert(!ret); @@ -380,10 +499,13 @@ retry: } for (i = 0; i < nr_lock; i++) pthread_mutex_unlock(&lock_ranges[i]->lock); + pthread_mutex_unlock(&new_range->lock); /* Free old merged ranges */ for (i = 0; i < nr_merge; i++) rcu_free_range(ja, merge_ranges[i]); + dbg_printf("\n"); + return 0; /* retry paths */ @@ -391,18 +513,24 @@ unlock_retry: for (i = 0; i < nr_lock; i++) pthread_mutex_unlock(&lock_ranges[i]->lock); goto retry; + /* error paths */ +unlock_error: + for (i = 0; i < nr_lock; i++) + pthread_mutex_unlock(&lock_ranges[i]->lock); + return ret; } -struct cds_ja *_cds_ja_range_new(const struct rcu_flavor_struct *flavor) +struct cds_ja *_cds_ja_range_new(unsigned int key_bits, + const struct rcu_flavor_struct *flavor) { struct cds_ja_range *range; struct cds_ja *ja; int ret; - ja = _cds_ja_new(CDS_JA_RANGE_KEY_BITS, flavor); + ja = _cds_ja_new(key_bits, flavor); if (!ja) return NULL; - range = range_create(0, UINT64_MAX, NULL, CDS_JA_RANGE_FREE); + range = range_create(0, UINT64_MAX - 1, NULL, CDS_JA_RANGE_FREE); if (!range) goto free_ja; cds_lfht_rcu_flavor(ja->ht)->read_lock(); @@ -420,6 +548,51 @@ free_ja: return NULL; } +int cds_ja_range_validate(struct cds_ja *ja) +{ + uint64_t iter_key, start, end, last_end = UINT64_MAX; + struct cds_ja_node *ja_node, *last_node; + int ret = 0; + + cds_lfht_rcu_flavor(ja->ht)->read_lock(); + cds_ja_for_each_key_rcu(ja, iter_key, ja_node) { + struct cds_ja_range *range; + struct cds_ja_node *first_node; + + first_node = ja_node; + cds_ja_for_each_duplicate_rcu(ja_node) + last_node = ja_node; + if (last_node != first_node) { + struct cds_ja_range *first_range = caa_container_of(first_node, + struct cds_ja_range, ja_node); + struct cds_ja_range *last_range = caa_container_of(last_node, + struct cds_ja_range, ja_node); + fprintf(stderr, "found duplicate node: first %" PRIu64 "-%" PRIu64 " last %" PRIu64 "-%" PRIu64 "\n", + first_range->start, first_range->end, last_range->start, last_range->end); + ret |= -1; + } + range = caa_container_of(last_node, + struct cds_ja_range, ja_node); + start = range->start; + end = range->end; + if (last_end != UINT64_MAX) { + if (start != last_end + 1) { + fprintf(stderr, "ja range discrepancy: last end: %" PRIu64 ", start: %" PRIu64 "\n", + last_end, start); + ret |= -1; + } + } + last_end = end; + } + if (last_end != UINT64_MAX - 1) { + fprintf(stderr, "ja range error: end of last range is: %" PRIu64 "\n", + last_end); + ret |= 1; + } + cds_lfht_rcu_flavor(ja->ht)->read_unlock(); + return ret; +} + int cds_ja_range_destroy(struct cds_ja *ja, void (*free_priv)(void *ptr)) {