* To discuss these guarantees, we first define "read" operation as any
* of the the basic cds_lfht_lookup, cds_lfht_next_duplicate,
* cds_lfht_first, cds_lfht_next operation, as well as
- * cds_lfht_add_unique (failure).
+ * cds_lfht_add_unique (failure).
*
* We define "read traversal" operation as any of the following
* group of operations
* shrink hash table from order 6 to 5: fini the index=6 bucket node table
*
* A bit of ascii art explanation:
- *
+ *
* The order index is the off-by-one compared to the actual power of 2
* because we use index 0 to deal with the 0 special-case.
- *
+ *
* This shows the nodes for a small table ordered by reversed bits:
- *
+ *
* bits reverse
* 0 000 000
* 4 100 001
* 5 101 101
* 3 011 110
* 7 111 111
- *
- * This shows the nodes in order of non-reversed bits, linked by
+ *
+ * This shows the nodes in order of non-reversed bits, linked by
* reversed-bit order.
- *
+ *
* order bits reverse
* 0 0 000 000
* 1 | 1 001 100 <-
#include <sched.h>
#include "config.h"
+#include "compat-getcpu.h"
#include <urcu.h>
#include <urcu-call-rcu.h>
#include <urcu-flavor.h>
* Originally from Public Domain.
*/
-static const uint8_t BitReverseTable256[256] =
+static const uint8_t BitReverseTable256[256] =
{
#define R2(n) (n), (n) + 2*64, (n) + 1*64, (n) + 3*64
#define R4(n) R2(n), R2((n) + 2*16), R2((n) + 1*16), R2((n) + 3*16)
static
uint32_t bit_reverse_u32(uint32_t v)
{
- return ((uint32_t) bit_reverse_u8(v) << 24) |
- ((uint32_t) bit_reverse_u8(v >> 8) << 16) |
- ((uint32_t) bit_reverse_u8(v >> 16) << 8) |
+ return ((uint32_t) bit_reverse_u8(v) << 24) |
+ ((uint32_t) bit_reverse_u8(v >> 8) << 16) |
+ ((uint32_t) bit_reverse_u8(v >> 16) << 8) |
((uint32_t) bit_reverse_u8(v >> 24));
}
#else
static
uint64_t bit_reverse_u64(uint64_t v)
{
- return ((uint64_t) bit_reverse_u8(v) << 56) |
- ((uint64_t) bit_reverse_u8(v >> 8) << 48) |
+ return ((uint64_t) bit_reverse_u8(v) << 56) |
+ ((uint64_t) bit_reverse_u8(v >> 8) << 48) |
((uint64_t) bit_reverse_u8(v >> 16) << 40) |
((uint64_t) bit_reverse_u8(v >> 24) << 32) |
- ((uint64_t) bit_reverse_u8(v >> 32) << 24) |
- ((uint64_t) bit_reverse_u8(v >> 40) << 16) |
+ ((uint64_t) bit_reverse_u8(v >> 32) << 24) |
+ ((uint64_t) bit_reverse_u8(v >> 40) << 16) |
((uint64_t) bit_reverse_u8(v >> 48) << 8) |
((uint64_t) bit_reverse_u8(v >> 56));
}
{
int r;
- asm("bsrl %1,%0\n\t"
+ __asm__ ("bsrl %1,%0\n\t"
"jnz 1f\n\t"
"movl $-1,%0\n\t"
"1:\n\t"
{
long r;
- asm("bsrq %1,%0\n\t"
+ __asm__ ("bsrq %1,%0\n\t"
"jnz 1f\n\t"
"movq $-1,%0\n\t"
"1:\n\t"
static long nr_cpus_mask = -1;
static long split_count_mask = -1;
+static int split_count_order = -1;
#if defined(HAVE_SYSCONF)
static void ht_init_nr_cpus_mask(void)
split_count_mask = DEFAULT_SPLIT_COUNT_MASK;
else
split_count_mask = nr_cpus_mask;
+ split_count_order =
+ cds_lfht_get_count_order_ulong(split_count_mask + 1);
}
assert(split_count_mask >= 0);
poison_free(ht->split_count);
}
-#if defined(HAVE_SCHED_GETCPU)
static
int ht_get_split_count_index(unsigned long hash)
{
int cpu;
assert(split_count_mask >= 0);
- cpu = sched_getcpu();
+ cpu = urcu_sched_getcpu();
if (caa_unlikely(cpu < 0))
return hash & split_count_mask;
else
return cpu & split_count_mask;
}
-#else /* #if defined(HAVE_SCHED_GETCPU) */
-static
-int ht_get_split_count_index(unsigned long hash)
-{
- return hash & split_count_mask;
-}
-#endif /* #else #if defined(HAVE_SCHED_GETCPU) */
static
void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash)
* Use bucket-local length for small table expand and for
* environments lacking per-cpu data support.
*/
- if (count >= (1UL << COUNT_COMMIT_ORDER))
+ if (count >= (1UL << (COUNT_COMMIT_ORDER + split_count_order)))
return;
if (chain_len > 100)
dbg_printf("WARNING: large chain length: %u.\n",
chain_len);
- if (chain_len >= CHAIN_LEN_RESIZE_THRESHOLD)
- cds_lfht_resize_lazy_grow(ht, size,
- cds_lfht_get_count_order_u32(chain_len - (CHAIN_LEN_TARGET - 1)));
+ if (chain_len >= CHAIN_LEN_RESIZE_THRESHOLD) {
+ int growth;
+
+ /*
+ * Ideal growth calculated based on chain length.
+ */
+ growth = cds_lfht_get_count_order_u32(chain_len
+ - (CHAIN_LEN_TARGET - 1));
+ if ((ht->flags & CDS_LFHT_ACCOUNTING)
+ && (size << growth)
+ >= (1UL << (COUNT_COMMIT_ORDER
+ + split_count_order))) {
+ /*
+ * If ideal growth expands the hash table size
+ * beyond the "small hash table" sizes, use the
+ * maximum small hash table size to attempt
+ * expanding the hash table. This only applies
+ * when node accounting is available, otherwise
+ * the chain length is used to expand the hash
+ * table in every case.
+ */
+ growth = COUNT_COMMIT_ORDER + split_count_order
+ - cds_lfht_get_count_order_ulong(size);
+ if (growth <= 0)
+ return;
+ }
+ cds_lfht_resize_lazy_grow(ht, size, growth);
+ }
}
static
void (*fct)(struct cds_lfht *ht, unsigned long i,
unsigned long start, unsigned long len))
{
- unsigned long partition_len;
+ unsigned long partition_len, start = 0;
struct partition_resize_work *work;
int thread, ret;
unsigned long nr_threads;
+ assert(nr_cpus_mask != -1);
+ if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD)
+ goto fallback;
+
/*
* Note: nr_cpus_mask + 1 is always power of 2.
* We spawn just the number of threads we need to satisfy the minimum
}
partition_len = len >> cds_lfht_get_count_order_ulong(nr_threads);
work = calloc(nr_threads, sizeof(*work));
- assert(work);
+ if (!work) {
+ dbg_printf("error allocating for resize, single-threading\n");
+ goto fallback;
+ }
for (thread = 0; thread < nr_threads; thread++) {
work[thread].ht = ht;
work[thread].i = i;
work[thread].fct = fct;
ret = pthread_create(&(work[thread].thread_id), ht->resize_attr,
partition_resize_thread, &work[thread]);
+ if (ret == EAGAIN) {
+ /*
+ * Out of resources: wait and join the threads
+ * we've created, then handle leftovers.
+ */
+ dbg_printf("error spawning for resize, single-threading\n");
+ start = work[thread].start;
+ len -= start;
+ nr_threads = thread;
+ break;
+ }
assert(!ret);
}
for (thread = 0; thread < nr_threads; thread++) {
assert(!ret);
}
free(work);
+
+ /*
+ * A pthread_create failure above will either lead in us having
+ * no threads to join or starting at a non-zero offset,
+ * fallback to single thread processing of leftovers.
+ */
+ if (start == 0 && nr_threads > 0)
+ return;
+fallback:
+ ht->flavor->thread_online();
+ fct(ht, i, start, len);
+ ht->flavor->thread_offline();
}
/*
void init_table_populate(struct cds_lfht *ht, unsigned long i,
unsigned long len)
{
- assert(nr_cpus_mask != -1);
- if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) {
- ht->flavor->thread_online();
- init_table_populate_partition(ht, i, 0, len);
- ht->flavor->thread_offline();
- return;
- }
partition_resize_helper(ht, i, len, init_table_populate_partition);
}
static
void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len)
{
-
- assert(nr_cpus_mask != -1);
- if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) {
- ht->flavor->thread_online();
- remove_table_partition(ht, i, 0, len);
- ht->flavor->thread_offline();
- return;
- }
partition_resize_helper(ht, i, len, remove_table_partition);
}
unsigned long len;
len = 1UL << (i - 1);
- dbg_printf("fini order %lu len: %lu\n", i, len);
+ dbg_printf("fini order %ld len: %lu\n", i, len);
/* Stop shrink if the resize target changes under us */
if (CMM_LOAD_SHARED(ht->resize_target) > (1UL << (i - 1)))
*/
int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
{
- int ret;
+ int ret, was_online;
/* Wait for in-flight resize operations to complete */
_CMM_STORE_SHARED(ht->in_progress_destroy, 1);
cmm_smp_mb(); /* Store destroy before load resize */
- ht->flavor->thread_offline();
+ was_online = ht->flavor->read_ongoing();
+ if (was_online)
+ ht->flavor->thread_offline();
+ /* Calling with RCU read-side held is an error. */
+ if (ht->flavor->read_ongoing()) {
+ ret = -EINVAL;
+ if (was_online)
+ ht->flavor->thread_online();
+ goto end;
+ }
while (uatomic_read(&ht->in_progress_resize))
poll(NULL, 0, 100); /* wait for 100ms */
- ht->flavor->thread_online();
+ if (was_online)
+ ht->flavor->thread_online();
ret = cds_lfht_delete_bucket(ht);
if (ret)
return ret;
if (attr)
*attr = ht->resize_attr;
poison_free(ht);
+end:
return ret;
}
void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
{
+ int was_online;
+
+ was_online = ht->flavor->read_ongoing();
+ if (was_online)
+ ht->flavor->thread_offline();
+ /* Calling with RCU read-side held is an error. */
+ if (ht->flavor->read_ongoing()) {
+ static int print_once;
+
+ if (!CMM_LOAD_SHARED(print_once))
+ fprintf(stderr, "[error] rculfhash: cds_lfht_resize "
+ "called with RCU read-side lock held.\n");
+ CMM_STORE_SHARED(print_once, 1);
+ assert(0);
+ goto end;
+ }
resize_target_update_count(ht, new_size);
CMM_STORE_SHARED(ht->resize_initiated, 1);
- ht->flavor->thread_offline();
pthread_mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
pthread_mutex_unlock(&ht->resize_mutex);
- ht->flavor->thread_online();
+end:
+ if (was_online)
+ ht->flavor->thread_online();
}
static