cds_lfht_compare_fct compare_fct;
unsigned long hash_seed;
int flags;
+ /*
+ * We need to put the work threads offline (QSBR) when taking this
+ * mutex, because we use synchronize_rcu within this mutex critical
+ * section, which waits on read-side critical sections, and could
+ * therefore cause grace-period deadlock if we hold off RCU G.P.
+ * completion.
+ */
pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */
unsigned int in_progress_resize, in_progress_destroy;
void (*cds_lfht_call_rcu)(struct rcu_head *head,
void (*cds_lfht_synchronize_rcu)(void);
void (*cds_lfht_rcu_read_lock)(void);
void (*cds_lfht_rcu_read_unlock)(void);
+ void (*cds_lfht_rcu_thread_offline)(void);
+ void (*cds_lfht_rcu_thread_online)(void);
unsigned long count; /* global approximate item count */
struct ht_items_count *percpu_count; /* per-cpu item count */
};
iter = rcu_dereference(iter_prev->p.next);
assert(iter_prev->p.reverse_hash <= node->p.reverse_hash);
for (;;) {
- /* TODO: check if removed */
if (unlikely(!clear_flag(iter)))
goto insert;
- /* TODO: check if removed */
if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash))
goto insert;
next = rcu_dereference(clear_flag(iter)->p.next);
}
}
+/*
+ * Holding RCU read lock to protect _cds_lfht_add against memory
+ * reclaim that could be performed by other call_rcu worker threads (ABA
+ * problem).
+ */
static
void init_table_link(struct cds_lfht *ht, unsigned long i, unsigned long len)
{
unsigned long j;
+ ht->cds_lfht_rcu_thread_online();
ht->cds_lfht_rcu_read_lock();
for (j = 0; j < len; j++) {
struct cds_lfht_node *new_node =
break;
}
ht->cds_lfht_rcu_read_unlock();
+ ht->cds_lfht_rcu_thread_offline();
}
-/*
- * Holding RCU read lock to protect _cds_lfht_add against memory
- * reclaim that could be performed by other call_rcu worker threads (ABA
- * problem).
- */
static
void init_table(struct cds_lfht *ht,
unsigned long first_order, unsigned long len_order)
}
}
+/*
+ * Holding RCU read lock to protect _cds_lfht_remove against memory
+ * reclaim that could be performed by other call_rcu worker threads (ABA
+ * problem).
+ * For a single level, we logically remove and garbage collect each node.
+ *
+ * As a design choice, we perform logical removal and garbage collection on a
+ * node-per-node basis to simplify this algorithm. We also assume keeping good
+ * cache locality of the operation would overweight possible performance gain
+ * that could be achieved by batching garbage collection for multiple levels.
+ * However, this would have to be justified by benchmarks.
+ *
+ * Concurrent removal and add operations are helping us perform garbage
+ * collection of logically removed nodes. We guarantee that all logically
+ * removed nodes have been garbage-collected (unlinked) before call_rcu is
+ * invoked to free a hole level of dummy nodes (after a grace period).
+ *
+ * Logical removal and garbage collection can therefore be done in batch or on a
+ * node-per-node basis, as long as the guarantee above holds.
+ */
static
void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len)
{
unsigned long j;
+ ht->cds_lfht_rcu_thread_online();
ht->cds_lfht_rcu_read_lock();
for (j = 0; j < len; j++) {
struct cds_lfht_node *fini_node =
break;
}
ht->cds_lfht_rcu_read_unlock();
+ ht->cds_lfht_rcu_thread_offline();
}
-/*
- * Holding RCU read lock to protect _cds_lfht_remove against memory
- * reclaim that could be performed by other call_rcu worker threads (ABA
- * problem).
- */
static
void fini_table(struct cds_lfht *ht,
unsigned long first_order, unsigned long len_order)
void (*func)(struct rcu_head *head)),
void (*cds_lfht_synchronize_rcu)(void),
void (*cds_lfht_rcu_read_lock)(void),
- void (*cds_lfht_rcu_read_unlock)(void))
+ void (*cds_lfht_rcu_read_unlock)(void),
+ void (*cds_lfht_rcu_thread_offline)(void),
+ void (*cds_lfht_rcu_thread_online)(void))
{
struct cds_lfht *ht;
unsigned long order;
ht->cds_lfht_synchronize_rcu = cds_lfht_synchronize_rcu;
ht->cds_lfht_rcu_read_lock = cds_lfht_rcu_read_lock;
ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock;
+ ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline;
+ ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online;
ht->percpu_count = alloc_per_cpu_items_count();
/* this mutex should not nest in read-side C.S. */
pthread_mutex_init(&ht->resize_mutex, NULL);
order = get_count_order_ulong(max(init_size, MIN_TABLE_SIZE)) + 1;
ht->flags = flags;
+ ht->cds_lfht_rcu_thread_offline();
pthread_mutex_lock(&ht->resize_mutex);
init_table(ht, 0, order);
pthread_mutex_unlock(&ht->resize_mutex);
+ ht->cds_lfht_rcu_thread_online();
return ht;
}
{
resize_target_update_count(ht, new_size);
CMM_STORE_SHARED(ht->t.resize_initiated, 1);
+ ht->cds_lfht_rcu_thread_offline();
pthread_mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
pthread_mutex_unlock(&ht->resize_mutex);
+ ht->cds_lfht_rcu_thread_online();
}
static
caa_container_of(head, struct rcu_resize_work, head);
struct cds_lfht *ht = work->ht;
+ ht->cds_lfht_rcu_thread_offline();
pthread_mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
pthread_mutex_unlock(&ht->resize_mutex);
+ ht->cds_lfht_rcu_thread_online();
poison_free(work);
cmm_smp_mb(); /* finish resize before decrement */
uatomic_dec(&ht->in_progress_resize);