#include <urcu/uatomic.h>
#include <urcu/compiler.h>
#include <urcu/rculfhash.h>
-#include <rculfhash-internal.h>
+#include <urcu/static/urcu-signal-nr.h>
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
+#include "rculfhash-internal.h"
#include "workqueue.h"
#include "urcu-die.h"
#include "urcu-utils.h"
+#include "compat-smp.h"
/*
* Split-counters lazily update the global counter each 1024
};
static struct urcu_workqueue *cds_lfht_workqueue;
-static unsigned long cds_lfht_workqueue_user_count;
/*
* Mutex ensuring mutual exclusion between workqueue initialization and
*/
static int cds_lfht_workqueue_atfork_nesting;
+static void __attribute__((destructor)) cds_lfht_exit(void);
static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor);
-static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor);
#ifdef CONFIG_CDS_LFHT_ITER_DEBUG
#else
static
-void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht __attribute__((unused)),
+ struct cds_lfht_iter *iter __attribute__((unused)))
{
}
* Returns 0 if no bit is set, else returns the position of the most
* significant bit (from 1 to 32 on 32-bit, from 1 to 64 on 64-bit).
*/
-#if defined(__i386) || defined(__x86_64)
+#if defined(URCU_ARCH_X86)
static inline
unsigned int fls_u32(uint32_t x)
{
#define HAS_FLS_U32
#endif
-#if defined(__x86_64)
+#if defined(URCU_ARCH_AMD64)
static inline
unsigned int fls_u64(uint64_t x)
{
* Return the minimum order for which x <= (1UL << order).
* Return -1 if x is 0.
*/
+static
int cds_lfht_get_count_order_u32(uint32_t x)
{
if (!x)
static long split_count_mask = -1;
static int split_count_order = -1;
-#if defined(HAVE_SYSCONF)
static void ht_init_nr_cpus_mask(void)
{
long maxcpus;
- maxcpus = sysconf(_SC_NPROCESSORS_CONF);
+ maxcpus = get_possible_cpus_array_len();
if (maxcpus <= 0) {
nr_cpus_mask = -2;
return;
maxcpus = 1UL << cds_lfht_get_count_order_ulong(maxcpus);
nr_cpus_mask = maxcpus - 1;
}
-#else /* #if defined(HAVE_SYSCONF) */
-static void ht_init_nr_cpus_mask(void)
-{
- nr_cpus_mask = -2;
-}
-#endif /* #else #if defined(HAVE_SYSCONF) */
static
void alloc_split_items_count(struct cds_lfht *ht)
if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size)
return;
- dbg_printf("del set global %ld\n", count);
+ dbg_printf("del set global %lu\n", count);
/*
* Don't shrink table if the number of nodes is below a
* certain threshold.
}
static
-int is_removed(struct cds_lfht_node *node)
+int is_removed(const struct cds_lfht_node *node)
{
return ((unsigned long) node) & REMOVED_FLAG;
}
work[thread].len = partition_len;
work[thread].start = thread * partition_len;
work[thread].fct = fct;
- ret = pthread_create(&(work[thread].thread_id), ht->resize_attr,
+ ret = pthread_create(&(work[thread].thread_id),
+ ht->caller_resize_attr ? &ht->resize_attr : NULL,
partition_resize_thread, &work[thread]);
if (ret == EAGAIN) {
/*
* For 32-bit architectures, use the order allocator.
*/
static
-const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets)
+const struct cds_lfht_mm_type *get_mm_type(
+ unsigned long max_nr_buckets __attribute__((unused)))
{
return &cds_lfht_mm_order;
}
ht->flags = flags;
ht->flavor = flavor;
- ht->resize_attr = attr;
+ ht->caller_resize_attr = attr;
+ if (attr)
+ ht->resize_attr = *attr;
alloc_split_items_count(ht);
/* this mutex should not nest in read-side C.S. */
pthread_mutex_init(&ht->resize_mutex, NULL);
iter->next = next;
}
-void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match,
+void cds_lfht_next_duplicate(struct cds_lfht *ht __attribute__((unused)),
+ cds_lfht_match_fct match,
const void *key, struct cds_lfht_iter *iter)
{
struct cds_lfht_node *node, *next;
iter->next = next;
}
-void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+void cds_lfht_next(struct cds_lfht *ht __attribute__((unused)),
+ struct cds_lfht_iter *iter)
{
struct cds_lfht_node *node, *next;
return ret;
}
-int cds_lfht_is_node_deleted(struct cds_lfht_node *node)
+int cds_lfht_is_node_deleted(const struct cds_lfht_node *node)
{
return is_removed(CMM_LOAD_SHARED(node->next));
}
+static
+bool cds_lfht_is_empty(struct cds_lfht *ht)
+{
+ struct cds_lfht_node *node, *next;
+ bool empty = true;
+ bool was_online;
+
+ was_online = ht->flavor->read_ongoing();
+ if (!was_online) {
+ ht->flavor->thread_online();
+ ht->flavor->read_lock();
+ }
+ /* Check that the table is empty */
+ node = bucket_at(ht, 0);
+ do {
+ next = rcu_dereference(node->next);
+ if (!is_bucket(next)) {
+ empty = false;
+ break;
+ }
+ node = clear_flag(next);
+ } while (!is_end(node));
+ if (!was_online) {
+ ht->flavor->read_unlock();
+ ht->flavor->thread_offline();
+ }
+ return empty;
+}
+
static
int cds_lfht_delete_bucket(struct cds_lfht *ht)
{
return 0;
}
+static
+void do_auto_resize_destroy_cb(struct urcu_work *work)
+{
+ struct cds_lfht *ht = caa_container_of(work, struct cds_lfht, destroy_work);
+ int ret;
+
+ ht->flavor->register_thread();
+ ret = cds_lfht_delete_bucket(ht);
+ if (ret)
+ urcu_die(ret);
+ free_split_items_count(ht);
+ ret = pthread_mutex_destroy(&ht->resize_mutex);
+ if (ret)
+ urcu_die(ret);
+ ht->flavor->unregister_thread();
+ poison_free(ht);
+}
+
/*
* Should only be called when no more concurrent readers nor writers can
* possibly access the table.
int ret;
if (ht->flags & CDS_LFHT_AUTO_RESIZE) {
+ /*
+ * Perform error-checking for emptiness before queuing
+ * work, so we can return error to the caller. This runs
+ * concurrently with ongoing resize.
+ */
+ if (!cds_lfht_is_empty(ht))
+ return -EPERM;
/* Cancel ongoing resize operations. */
_CMM_STORE_SHARED(ht->in_progress_destroy, 1);
- /* Wait for in-flight resize operations to complete */
- urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
+ if (attr) {
+ *attr = ht->caller_resize_attr;
+ ht->caller_resize_attr = NULL;
+ }
+ /*
+ * Queue destroy work after prior queued resize
+ * operations. Given there are no concurrent writers
+ * accessing the hash table at this point, no resize
+ * operations can be queued after this destroy work.
+ */
+ urcu_workqueue_queue_work(cds_lfht_workqueue,
+ &ht->destroy_work, do_auto_resize_destroy_cb);
+ return 0;
}
ret = cds_lfht_delete_bucket(ht);
if (ret)
return ret;
free_split_items_count(ht);
if (attr)
- *attr = ht->resize_attr;
+ *attr = ht->caller_resize_attr;
ret = pthread_mutex_destroy(&ht->resize_mutex);
if (ret)
ret = -EBUSY;
- if (ht->flags & CDS_LFHT_AUTO_RESIZE)
- cds_lfht_fini_worker(ht->flavor);
poison_free(ht);
return ret;
}
__cds_lfht_resize_lazy_launch(ht);
}
-static void cds_lfht_before_fork(void *priv)
+static void cds_lfht_before_fork(void *priv __attribute__((unused)))
{
if (cds_lfht_workqueue_atfork_nesting++)
return;
urcu_workqueue_pause_worker(cds_lfht_workqueue);
}
-static void cds_lfht_after_fork_parent(void *priv)
+static void cds_lfht_after_fork_parent(void *priv __attribute__((unused)))
{
if (--cds_lfht_workqueue_atfork_nesting)
return;
mutex_unlock(&cds_lfht_fork_mutex);
}
-static void cds_lfht_after_fork_child(void *priv)
+static void cds_lfht_after_fork_child(void *priv __attribute__((unused)))
{
if (--cds_lfht_workqueue_atfork_nesting)
return;
.after_fork_child = cds_lfht_after_fork_child,
};
-/* Block all signals to ensure we don't disturb the application. */
-static void cds_lfht_worker_init(struct urcu_workqueue *workqueue,
- void *priv)
+/*
+ * Block all signals for the workqueue worker thread to ensure we don't
+ * disturb the application. The SIGRCU signal needs to be unblocked for
+ * the urcu-signal flavor.
+ */
+static void cds_lfht_worker_init(
+ struct urcu_workqueue *workqueue __attribute__((unused)),
+ void *priv __attribute__((unused)))
{
int ret;
sigset_t mask;
- /* Block signal for entire process, so only our thread processes it. */
ret = sigfillset(&mask);
if (ret)
urcu_die(errno);
- ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+ ret = sigdelset(&mask, SIGRCU);
+ if (ret)
+ urcu_die(errno);
+ ret = pthread_sigmask(SIG_SETMASK, &mask, NULL);
if (ret)
urcu_die(ret);
}
flavor->register_rculfhash_atfork(&cds_lfht_atfork);
mutex_lock(&cds_lfht_fork_mutex);
- if (cds_lfht_workqueue_user_count++)
- goto end;
- cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
- NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL);
-end:
+ if (!cds_lfht_workqueue)
+ cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
+ NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL);
mutex_unlock(&cds_lfht_fork_mutex);
}
-static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor)
+static void cds_lfht_exit(void)
{
mutex_lock(&cds_lfht_fork_mutex);
- if (--cds_lfht_workqueue_user_count)
- goto end;
- urcu_workqueue_destroy(cds_lfht_workqueue);
- cds_lfht_workqueue = NULL;
-end:
+ if (cds_lfht_workqueue) {
+ urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
+ urcu_workqueue_destroy(cds_lfht_workqueue);
+ cds_lfht_workqueue = NULL;
+ }
mutex_unlock(&cds_lfht_fork_mutex);
-
- flavor->unregister_rculfhash_atfork(&cds_lfht_atfork);
}