X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Frculfhash.c;h=37fbc1b7c44c1ef5bcd347b19b7ef3a483d6c016;hb=b047e7a793421e3ff1f5dca2b27c72751a1f4db4;hp=7c0b9fb8081fbfd4db089def5665d01d7b6cda90;hpb=5cfe81b7ddff9543d451746de9965cac58c67182;p=userspace-rcu.git diff --git a/src/rculfhash.c b/src/rculfhash.c index 7c0b9fb..37fbc1b 100644 --- a/src/rculfhash.c +++ b/src/rculfhash.c @@ -274,10 +274,10 @@ #include #include #include -#include #include #include #include +#include "rculfhash-internal.h" #include "workqueue.h" #include "urcu-die.h" #include "urcu-utils.h" @@ -363,7 +363,6 @@ struct partition_resize_work { }; static struct urcu_workqueue *cds_lfht_workqueue; -static unsigned long cds_lfht_workqueue_user_count; /* * Mutex ensuring mutual exclusion between workqueue initialization and @@ -380,8 +379,8 @@ static struct urcu_atfork cds_lfht_atfork; */ static int cds_lfht_workqueue_atfork_nesting; +static void __attribute__((destructor)) cds_lfht_exit(void); static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor); -static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor); #ifdef CONFIG_CDS_LFHT_ITER_DEBUG @@ -1251,6 +1250,7 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, struct partition_resize_work *work; int ret; unsigned long thread, nr_threads; + sigset_t newmask, oldmask; urcu_posix_assert(nr_cpus_mask != -1); if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) @@ -1273,13 +1273,20 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, dbg_printf("error allocating for resize, single-threading\n"); goto fallback; } + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); + for (thread = 0; thread < nr_threads; thread++) { work[thread].ht = ht; work[thread].i = i; work[thread].len = partition_len; work[thread].start = thread * partition_len; work[thread].fct = fct; - ret = pthread_create(&(work[thread].thread_id), ht->resize_attr, + ret = pthread_create(&(work[thread].thread_id), + ht->caller_resize_attr ? &ht->resize_attr : NULL, partition_resize_thread, &work[thread]); if (ret == EAGAIN) { /* @@ -1294,6 +1301,10 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, } urcu_posix_assert(!ret); } + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); + for (thread = 0; thread < nr_threads; thread++) { ret = pthread_join(work[thread].thread_id, NULL); urcu_posix_assert(!ret); @@ -1632,7 +1643,9 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size, ht->flags = flags; ht->flavor = flavor; - ht->resize_attr = attr; + ht->caller_resize_attr = attr; + if (attr) + ht->resize_attr = *attr; alloc_split_items_count(ht); /* this mutex should not nest in read-side C.S. */ pthread_mutex_init(&ht->resize_mutex, NULL); @@ -1846,6 +1859,35 @@ int cds_lfht_is_node_deleted(const struct cds_lfht_node *node) return is_removed(CMM_LOAD_SHARED(node->next)); } +static +bool cds_lfht_is_empty(struct cds_lfht *ht) +{ + struct cds_lfht_node *node, *next; + bool empty = true; + bool was_online; + + was_online = ht->flavor->read_ongoing(); + if (!was_online) { + ht->flavor->thread_online(); + ht->flavor->read_lock(); + } + /* Check that the table is empty */ + node = bucket_at(ht, 0); + do { + next = rcu_dereference(node->next); + if (!is_bucket(next)) { + empty = false; + break; + } + node = clear_flag(next); + } while (!is_end(node)); + if (!was_online) { + ht->flavor->read_unlock(); + ht->flavor->thread_offline(); + } + return empty; +} + static int cds_lfht_delete_bucket(struct cds_lfht *ht) { @@ -1880,6 +1922,24 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) return 0; } +static +void do_auto_resize_destroy_cb(struct urcu_work *work) +{ + struct cds_lfht *ht = caa_container_of(work, struct cds_lfht, destroy_work); + int ret; + + ht->flavor->register_thread(); + ret = cds_lfht_delete_bucket(ht); + if (ret) + urcu_die(ret); + free_split_items_count(ht); + ret = pthread_mutex_destroy(&ht->resize_mutex); + if (ret) + urcu_die(ret); + ht->flavor->unregister_thread(); + poison_free(ht); +} + /* * Should only be called when no more concurrent readers nor writers can * possibly access the table. @@ -1889,22 +1949,38 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) int ret; if (ht->flags & CDS_LFHT_AUTO_RESIZE) { + /* + * Perform error-checking for emptiness before queuing + * work, so we can return error to the caller. This runs + * concurrently with ongoing resize. + */ + if (!cds_lfht_is_empty(ht)) + return -EPERM; /* Cancel ongoing resize operations. */ _CMM_STORE_SHARED(ht->in_progress_destroy, 1); - /* Wait for in-flight resize operations to complete */ - urcu_workqueue_flush_queued_work(cds_lfht_workqueue); + if (attr) { + *attr = ht->caller_resize_attr; + ht->caller_resize_attr = NULL; + } + /* + * Queue destroy work after prior queued resize + * operations. Given there are no concurrent writers + * accessing the hash table at this point, no resize + * operations can be queued after this destroy work. + */ + urcu_workqueue_queue_work(cds_lfht_workqueue, + &ht->destroy_work, do_auto_resize_destroy_cb); + return 0; } ret = cds_lfht_delete_bucket(ht); if (ret) return ret; free_split_items_count(ht); if (attr) - *attr = ht->resize_attr; + *attr = ht->caller_resize_attr; ret = pthread_mutex_destroy(&ht->resize_mutex); if (ret) ret = -EBUSY; - if (ht->flags & CDS_LFHT_AUTO_RESIZE) - cds_lfht_fini_worker(ht->flavor); poison_free(ht); return ret; } @@ -2163,51 +2239,24 @@ static struct urcu_atfork cds_lfht_atfork = { .after_fork_child = cds_lfht_after_fork_child, }; -/* - * Block all signals for the workqueue worker thread to ensure we don't - * disturb the application. The SIGRCU signal needs to be unblocked for - * the urcu-signal flavor. - */ -static void cds_lfht_worker_init( - struct urcu_workqueue *workqueue __attribute__((unused)), - void *priv __attribute__((unused))) -{ - int ret; - sigset_t mask; - - ret = sigfillset(&mask); - if (ret) - urcu_die(errno); - ret = sigdelset(&mask, SIGRCU); - if (ret) - urcu_die(errno); - ret = pthread_sigmask(SIG_SETMASK, &mask, NULL); - if (ret) - urcu_die(ret); -} - static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor) { flavor->register_rculfhash_atfork(&cds_lfht_atfork); mutex_lock(&cds_lfht_fork_mutex); - if (cds_lfht_workqueue_user_count++) - goto end; - cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL, - NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL); -end: + if (!cds_lfht_workqueue) + cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, NULL); mutex_unlock(&cds_lfht_fork_mutex); } -static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor) +static void cds_lfht_exit(void) { mutex_lock(&cds_lfht_fork_mutex); - if (--cds_lfht_workqueue_user_count) - goto end; - urcu_workqueue_destroy(cds_lfht_workqueue); - cds_lfht_workqueue = NULL; -end: + if (cds_lfht_workqueue) { + urcu_workqueue_flush_queued_work(cds_lfht_workqueue); + urcu_workqueue_destroy(cds_lfht_workqueue); + cds_lfht_workqueue = NULL; + } mutex_unlock(&cds_lfht_fork_mutex); - - flavor->unregister_rculfhash_atfork(&cds_lfht_atfork); }