X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fworkqueue.c;h=1039d7297b9ce13446c4b81aee86fa217a005eab;hb=ea3a28a3f71dd02fb34ed4e3108f93275dbef89a;hp=8561a7a817dfd1dac7fff3aa4353682fc55edc79;hpb=9db9edccbed8465728f0bd9db12e03bd6f291726;p=urcu.git diff --git a/src/workqueue.c b/src/workqueue.c index 8561a7a..1039d72 100644 --- a/src/workqueue.c +++ b/src/workqueue.c @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -36,13 +35,13 @@ #include #include "compat-getcpu.h" -#include "urcu/wfcqueue.h" -#include "urcu-call-rcu.h" -#include "urcu-pointer.h" -#include "urcu/list.h" -#include "urcu/futex.h" -#include "urcu/tls-compat.h" -#include "urcu/ref.h" +#include +#include +#include +#include +#include +#include +#include #include "urcu-die.h" #include "workqueue.h" @@ -55,10 +54,9 @@ struct urcu_workqueue { /* * We do not align head on a different cache-line than tail - * mainly because call_rcu callback-invocation threads use - * batching ("splice") to get an entire list of callbacks, which - * effectively empties the queue, and requires to touch the tail - * anyway. + * mainly because workqueue threads use batching ("splice") to + * get an entire list of callbacks, which effectively empties + * the queue, and requires to touch the tail anyway. */ struct cds_wfcq_tail cbs_tail; struct cds_wfcq_head cbs_head; @@ -94,7 +92,7 @@ struct urcu_workqueue_completion_work { * Losing affinity can be caused by CPU hotunplug/hotplug, or by * cpuset(7). */ -#if HAVE_SCHED_SETAFFINITY +#ifdef HAVE_SCHED_SETAFFINITY static int set_thread_cpu_affinity(struct urcu_workqueue *workqueue) { cpu_set_t mask; @@ -109,11 +107,8 @@ static int set_thread_cpu_affinity(struct urcu_workqueue *workqueue) CPU_ZERO(&mask); CPU_SET(workqueue->cpu_affinity, &mask); -#if SCHED_SETAFFINITY_ARGS == 2 - ret = sched_setaffinity(0, &mask); -#else ret = sched_setaffinity(0, sizeof(mask), &mask); -#endif + /* * EINVAL is fine: can be caused by hotunplugged CPUs, or by * cpuset(7). This is why we should always retry if we detect @@ -126,7 +121,7 @@ static int set_thread_cpu_affinity(struct urcu_workqueue *workqueue) return ret; } #else -static int set_thread_cpu_affinity(struct urcu_workqueue *workqueue) +static int set_thread_cpu_affinity(struct urcu_workqueue *workqueue __attribute__((unused))) { return 0; } @@ -136,16 +131,25 @@ static void futex_wait(int32_t *futex) { /* Read condition before read futex */ cmm_smp_mb(); - if (uatomic_read(futex) != -1) - return; - while (futex_async(futex, FUTEX_WAIT, -1, NULL, NULL, 0)) { + while (uatomic_read(futex) == -1) { + if (!futex_async(futex, FUTEX_WAIT, -1, NULL, NULL, 0)) { + /* + * Prior queued wakeups queued by unrelated code + * using the same address can cause futex wait to + * return 0 even through the futex value is still + * -1 (spurious wakeups). Check the value again + * in user-space to validate whether it really + * differs from -1. + */ + continue; + } switch (errno) { - case EWOULDBLOCK: + case EAGAIN: /* Value already changed. */ return; case EINTR: /* Retry if interrupted by signal. */ - break; /* Get out of switch. */ + break; /* Get out of switch. Check again. */ default: /* Unexpected error. */ urcu_die(errno); @@ -215,19 +219,19 @@ static void *workqueue_thread(void *arg) cds_wfcq_init(&cbs_tmp_head, &cbs_tmp_tail); splice_ret = __cds_wfcq_splice_blocking(&cbs_tmp_head, &cbs_tmp_tail, &workqueue->cbs_head, &workqueue->cbs_tail); - assert(splice_ret != CDS_WFCQ_RET_WOULDBLOCK); - assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY); + urcu_posix_assert(splice_ret != CDS_WFCQ_RET_WOULDBLOCK); + urcu_posix_assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY); if (splice_ret != CDS_WFCQ_RET_SRC_EMPTY) { if (workqueue->grace_period_fct) workqueue->grace_period_fct(workqueue, workqueue->priv); cbcount = 0; __cds_wfcq_for_each_blocking_safe(&cbs_tmp_head, &cbs_tmp_tail, cbs, cbs_tmp_n) { - struct rcu_head *rhp; + struct urcu_work *uwp; - rhp = caa_container_of(cbs, - struct rcu_head, next); - rhp->func(rhp); + uwp = caa_container_of(cbs, + struct urcu_work, next); + uwp->func(uwp); cbcount++; } uatomic_sub(&workqueue->qlen, cbcount); @@ -240,25 +244,25 @@ static void *workqueue_thread(void *arg) if (cds_wfcq_empty(&workqueue->cbs_head, &workqueue->cbs_tail)) { futex_wait(&workqueue->futex); - (void) poll(NULL, 0, 10); uatomic_dec(&workqueue->futex); /* * Decrement futex before reading - * call_rcu list. + * urcu_work list. */ cmm_smp_mb(); - } else { - (void) poll(NULL, 0, 10); } } else { - (void) poll(NULL, 0, 10); + if (cds_wfcq_empty(&workqueue->cbs_head, + &workqueue->cbs_tail)) { + (void) poll(NULL, 0, 10); + } } if (workqueue->worker_after_wake_up_fct) workqueue->worker_after_wake_up_fct(workqueue, workqueue->priv); } if (!rt) { /* - * Read call_rcu list before write futex. + * Read urcu_work list before write futex. */ cmm_smp_mb(); uatomic_set(&workqueue->futex, 0); @@ -280,6 +284,7 @@ struct urcu_workqueue *urcu_workqueue_create(unsigned long flags, { struct urcu_workqueue *workqueue; int ret; + sigset_t newmask, oldmask; workqueue = malloc(sizeof(*workqueue)); if (workqueue == NULL) @@ -300,10 +305,20 @@ struct urcu_workqueue *urcu_workqueue_create(unsigned long flags, workqueue->cpu_affinity = cpu_affinity; workqueue->loop_count = 0; cmm_smp_mb(); /* Structure initialized before pointer is planted. */ + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); + ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue); if (ret) { urcu_die(ret); } + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); + return workqueue; } @@ -341,7 +356,7 @@ void urcu_workqueue_destroy(struct urcu_workqueue *workqueue) if (urcu_workqueue_destroy_worker(workqueue)) { urcu_die(errno); } - assert(cds_wfcq_empty(&workqueue->cbs_head, &workqueue->cbs_tail)); + urcu_posix_assert(cds_wfcq_empty(&workqueue->cbs_head, &workqueue->cbs_tail)); free(workqueue); } @@ -460,13 +475,23 @@ void urcu_workqueue_resume_worker(struct urcu_workqueue *workqueue) void urcu_workqueue_create_worker(struct urcu_workqueue *workqueue) { int ret; + sigset_t newmask, oldmask; /* Clear workqueue state from parent. */ workqueue->flags &= ~URCU_WORKQUEUE_PAUSED; workqueue->flags &= ~URCU_WORKQUEUE_PAUSE; workqueue->tid = 0; + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); + ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue); if (ret) { urcu_die(ret); } + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); }