#include "config.h"
#include <urcu.h>
#include <urcu-call-rcu.h>
+#include <urcu-flavor.h>
#include <urcu/arch.h>
#include <urcu/uatomic.h>
#include <urcu/compiler.h>
struct rcu_table t;
unsigned long min_alloc_buckets_order;
unsigned long min_nr_alloc_buckets;
+ unsigned long max_nr_buckets;
int flags;
/*
* We need to put the work threads offline (QSBR) when taking this
*/
pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */
unsigned int in_progress_resize, in_progress_destroy;
- void (*cds_lfht_call_rcu)(struct rcu_head *head,
- void (*func)(struct rcu_head *head));
- void (*cds_lfht_synchronize_rcu)(void);
- void (*cds_lfht_rcu_read_lock)(void);
- void (*cds_lfht_rcu_read_unlock)(void);
- void (*cds_lfht_rcu_thread_offline)(void);
- void (*cds_lfht_rcu_thread_online)(void);
- void (*cds_lfht_rcu_register_thread)(void);
- void (*cds_lfht_rcu_unregister_thread)(void);
+ const struct rcu_flavor_struct *flavor;
pthread_attr_t *resize_attr; /* Resize threads attributes */
long count; /* global approximate item count */
struct ht_items_count *split_count; /* split item count */
{
struct partition_resize_work *work = arg;
- work->ht->cds_lfht_rcu_register_thread();
+ work->ht->flavor->register_thread();
work->fct(work->ht, work->i, work->start, work->len);
- work->ht->cds_lfht_rcu_unregister_thread();
+ work->ht->flavor->unregister_thread();
return NULL;
}
unsigned long j, size = 1UL << (i - 1);
assert(i > MIN_TABLE_ORDER);
- ht->cds_lfht_rcu_read_lock();
+ ht->flavor->read_lock();
for (j = size + start; j < size + start + len; j++) {
struct cds_lfht_node *new_node = bucket_at(ht, j);
new_node->reverse_hash = bit_reverse_ulong(j);
_cds_lfht_add(ht, NULL, NULL, size, new_node, NULL, 1);
}
- ht->cds_lfht_rcu_read_unlock();
+ ht->flavor->read_unlock();
}
static
{
assert(nr_cpus_mask != -1);
if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) {
- ht->cds_lfht_rcu_thread_online();
+ ht->flavor->thread_online();
init_table_populate_partition(ht, i, 0, len);
- ht->cds_lfht_rcu_thread_offline();
+ ht->flavor->thread_offline();
return;
}
partition_resize_helper(ht, i, len, init_table_populate_partition);
unsigned long j, size = 1UL << (i - 1);
assert(i > MIN_TABLE_ORDER);
- ht->cds_lfht_rcu_read_lock();
+ ht->flavor->read_lock();
for (j = size + start; j < size + start + len; j++) {
struct cds_lfht_node *fini_node = bucket_at(ht, j);
fini_node->reverse_hash = bit_reverse_ulong(j);
(void) _cds_lfht_del(ht, size, fini_node, 1);
}
- ht->cds_lfht_rcu_read_unlock();
+ ht->flavor->read_unlock();
}
static
assert(nr_cpus_mask != -1);
if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) {
- ht->cds_lfht_rcu_thread_online();
+ ht->flavor->thread_online();
remove_table_partition(ht, i, 0, len);
- ht->cds_lfht_rcu_thread_offline();
+ ht->flavor->thread_offline();
return;
}
partition_resize_helper(ht, i, len, remove_table_partition);
* releasing the old bucket nodes. Otherwise their lookup will
* return a logically removed node as insert position.
*/
- ht->cds_lfht_synchronize_rcu();
+ ht->flavor->update_synchronize_rcu();
if (free_by_rcu_order)
cds_lfht_free_bucket_table(ht, free_by_rcu_order);
}
if (free_by_rcu_order) {
- ht->cds_lfht_synchronize_rcu();
+ ht->flavor->update_synchronize_rcu();
cds_lfht_free_bucket_table(ht, free_by_rcu_order);
}
}
struct cds_lfht *_cds_lfht_new(unsigned long init_size,
unsigned long min_nr_alloc_buckets,
+ unsigned long max_nr_buckets,
int flags,
- void (*cds_lfht_call_rcu)(struct rcu_head *head,
- void (*func)(struct rcu_head *head)),
- void (*cds_lfht_synchronize_rcu)(void),
- void (*cds_lfht_rcu_read_lock)(void),
- void (*cds_lfht_rcu_read_unlock)(void),
- void (*cds_lfht_rcu_thread_offline)(void),
- void (*cds_lfht_rcu_thread_online)(void),
- void (*cds_lfht_rcu_register_thread)(void),
- void (*cds_lfht_rcu_unregister_thread)(void),
+ const struct rcu_flavor_struct *flavor,
pthread_attr_t *attr)
{
struct cds_lfht *ht;
/* min_nr_alloc_buckets must be power of two */
if (!min_nr_alloc_buckets || (min_nr_alloc_buckets & (min_nr_alloc_buckets - 1)))
return NULL;
+
/* init_size must be power of two */
if (!init_size || (init_size & (init_size - 1)))
return NULL;
+
+ if (!max_nr_buckets)
+ max_nr_buckets = 1UL << (MAX_TABLE_ORDER - 1);
+
+ /* max_nr_buckets must be power of two */
+ if (!max_nr_buckets || (max_nr_buckets & (max_nr_buckets - 1)))
+ return NULL;
+
min_nr_alloc_buckets = max(min_nr_alloc_buckets, MIN_TABLE_SIZE);
init_size = max(init_size, MIN_TABLE_SIZE);
+ max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets);
+ init_size = min(init_size, max_nr_buckets);
ht = calloc(1, sizeof(struct cds_lfht));
assert(ht);
ht->flags = flags;
- ht->cds_lfht_call_rcu = cds_lfht_call_rcu;
- ht->cds_lfht_synchronize_rcu = cds_lfht_synchronize_rcu;
- ht->cds_lfht_rcu_read_lock = cds_lfht_rcu_read_lock;
- ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock;
- ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline;
- ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online;
- ht->cds_lfht_rcu_register_thread = cds_lfht_rcu_register_thread;
- ht->cds_lfht_rcu_unregister_thread = cds_lfht_rcu_unregister_thread;
+ ht->flavor = flavor;
ht->resize_attr = attr;
alloc_split_items_count(ht);
/* this mutex should not nest in read-side C.S. */
ht->t.resize_target = 1UL << order;
ht->min_nr_alloc_buckets = min_nr_alloc_buckets;
ht->min_alloc_buckets_order = get_count_order_ulong(min_nr_alloc_buckets);
+ ht->max_nr_buckets = max_nr_buckets;
cds_lfht_create_bucket(ht, 1UL << order);
ht->t.size = 1UL << order;
return ht;
unsigned long count)
{
count = max(count, MIN_TABLE_SIZE);
+ count = min(count, ht->max_nr_buckets);
uatomic_set(&ht->t.resize_target, count);
}
{
resize_target_update_count(ht, new_size);
CMM_STORE_SHARED(ht->t.resize_initiated, 1);
- ht->cds_lfht_rcu_thread_offline();
+ ht->flavor->thread_offline();
pthread_mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
pthread_mutex_unlock(&ht->resize_mutex);
- ht->cds_lfht_rcu_thread_online();
+ ht->flavor->thread_online();
}
static
caa_container_of(head, struct rcu_resize_work, head);
struct cds_lfht *ht = work->ht;
- ht->cds_lfht_rcu_thread_offline();
+ ht->flavor->thread_offline();
pthread_mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
pthread_mutex_unlock(&ht->resize_mutex);
- ht->cds_lfht_rcu_thread_online();
+ ht->flavor->thread_online();
poison_free(work);
cmm_smp_mb(); /* finish resize before decrement */
uatomic_dec(&ht->in_progress_resize);
}
work = malloc(sizeof(*work));
work->ht = ht;
- ht->cds_lfht_call_rcu(&work->head, do_resize_cb);
+ ht->flavor->update_call_rcu(&work->head, do_resize_cb);
CMM_STORE_SHARED(ht->t.resize_initiated, 1);
}
}
{
unsigned long target_size = size << growth;
+ target_size = min(target_size, ht->max_nr_buckets);
if (resize_target_grow(ht, target_size) >= target_size)
return;
if (!(ht->flags & CDS_LFHT_AUTO_RESIZE))
return;
count = max(count, MIN_TABLE_SIZE);
+ count = min(count, ht->max_nr_buckets);
if (count == size)
return; /* Already the right size, no resize needed */
if (count > size) { /* lazy grow */