From 1b0a9891cd2f9100dc87745d77a4d0069a21adb7 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Thu, 23 Oct 2014 11:13:24 -0400 Subject: [PATCH] workqueue: implement shutdown Signed-off-by: Mathieu Desnoyers --- tests/benchmark/test_urcu_workqueue.c | 25 ++++++---------------- urcu/workqueue-fifo.h | 30 +++++++++++++++++++++------ 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/tests/benchmark/test_urcu_workqueue.c b/tests/benchmark/test_urcu_workqueue.c index 9ac2350..d12d9e8 100644 --- a/tests/benchmark/test_urcu_workqueue.c +++ b/tests/benchmark/test_urcu_workqueue.c @@ -52,7 +52,7 @@ #include #include -static volatile int test_go, test_stop_enqueue, test_stop_dequeue; +static volatile int test_go, test_stop_enqueue; static unsigned long work_loops; @@ -119,11 +119,6 @@ static void set_affinity(void) /* * returns 0 if test should end. */ -static int test_duration_dequeue(void) -{ - return !test_stop_dequeue; -} - static int test_duration_enqueue(void) { return !test_stop_enqueue; @@ -185,7 +180,6 @@ static void *thr_worker(void *_count) unsigned long long *count = _count; unsigned int counter = 0; struct urcu_worker worker; - int blocking = 1; printf_verbose("thread_begin %s, tid %lu\n", "worker", urcu_get_thread_id()); @@ -203,9 +197,11 @@ static void *thr_worker(void *_count) cmm_smp_mb(); for (;;) { - int batch_work_count = 0; + enum urcu_accept_ret ret; - urcu_accept_work(&workqueue, &worker, blocking); + ret = urcu_accept_work(&workqueue, &worker); + if (ret == URCU_ACCEPT_SHUTDOWN) + break; for (;;) { struct urcu_work *work; struct test_work *t; @@ -215,17 +211,11 @@ static void *thr_worker(void *_count) break; t = caa_container_of(work, struct test_work, w); printf_verbose("dequeue work %p\n", t); - batch_work_count++; URCU_TLS(nr_dequeues)++; if (caa_unlikely(work_loops)) loop_sleep(work_loops); free(t); } - if (!test_duration_dequeue()) - blocking = 0; - if (caa_unlikely(!test_duration_dequeue() - && !batch_work_count)) - break; } end: urcu_worker_unregister(&workqueue, &worker); @@ -373,10 +363,7 @@ int main(int argc, char **argv) sleep(1); } } - test_stop_dequeue = 1; - - /* Send finish to all workers */ - urcu_workqueue_wakeup_all(&workqueue); + urcu_workqueue_shutdown(&workqueue); for (i = 0; i < nr_dispatchers; i++) { err = pthread_join(tid_dispatcher[i], &tret); diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h index 0c85ca7..6256bff 100644 --- a/urcu/workqueue-fifo.h +++ b/urcu/workqueue-fifo.h @@ -31,6 +31,11 @@ #include #include +enum urcu_accept_ret { + URCU_ACCEPT_WORK = 0, + URCU_ACCEPT_SHUTDOWN = 1, +}; + /* * We use RCU to steal work from siblings. Therefore, one of RCU flavors * need to be included before this header. All worker that participate @@ -53,6 +58,8 @@ struct urcu_workqueue { /* RCU linked list head of siblings for work stealing. */ struct cds_list_head sibling_head; pthread_mutex_t sibling_lock; /* Protect sibling list updates */ + + bool shutdown; /* Shutdown performed */ }; struct urcu_worker { @@ -75,6 +82,7 @@ void urcu_workqueue_init(struct urcu_workqueue *queue) __cds_wfcq_init(&queue->head, &queue->tail); urcu_wait_queue_init(&queue->waitqueue); CDS_INIT_LIST_HEAD(&queue->sibling_head); + queue->shutdown = false; } static inline @@ -108,7 +116,7 @@ void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work) } static inline -void urcu_workqueue_wakeup_all(struct urcu_workqueue *queue) +void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue) { struct urcu_waiters waiters; @@ -284,9 +292,8 @@ end: } static inline -void urcu_accept_work(struct urcu_workqueue *queue, - struct urcu_worker *worker, - int blocking) +enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue, + struct urcu_worker *worker) { enum cds_wfcq_ret wfcq_ret; @@ -302,8 +309,9 @@ void urcu_accept_work(struct urcu_workqueue *queue, /* Try to steal work from sibling instead of blocking */ if (__urcu_steal_work(queue, worker)) goto do_work; - if (!blocking) - return; + /* No more work to do, check shutdown state */ + if (CMM_LOAD_SHARED(queue->shutdown)) + return URCU_ACCEPT_SHUTDOWN; urcu_wait_set_state(&worker->wait_node, URCU_WAIT_WAITING); if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) { @@ -353,6 +361,7 @@ do_work: * they can steal from us. */ (void) __urcu_wakeup_siblings(queue, worker); + return URCU_ACCEPT_WORK; } static inline @@ -382,4 +391,13 @@ struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker) return caa_container_of(node, struct urcu_work, node); } +static inline +void urcu_workqueue_shutdown(struct urcu_workqueue *queue) +{ + /* Set shutdown */ + CMM_STORE_SHARED(queue->shutdown, true); + /* Wakeup all workers */ + __urcu_workqueue_wakeup_all(queue); +} + #endif /* _URCU_WORKQUEUE_FIFO_H */ -- 2.34.1