workqueue: implement shutdown
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 23 Oct 2014 15:13:24 +0000 (11:13 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 23 Oct 2014 15:13:24 +0000 (11:13 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
tests/benchmark/test_urcu_workqueue.c
urcu/workqueue-fifo.h

index 9ac235067030a98b995e186d59929268fcb51703..d12d9e8659f2f4d4e27f93626835288bf511c534 100644 (file)
@@ -52,7 +52,7 @@
 #include <urcu/wfstack.h>
 #include <urcu/workqueue-fifo.h>
 
-static volatile int test_go, test_stop_enqueue, test_stop_dequeue;
+static volatile int test_go, test_stop_enqueue;
 
 static unsigned long work_loops;
 
@@ -119,11 +119,6 @@ static void set_affinity(void)
 /*
  * returns 0 if test should end.
  */
-static int test_duration_dequeue(void)
-{
-       return !test_stop_dequeue;
-}
-
 static int test_duration_enqueue(void)
 {
        return !test_stop_enqueue;
@@ -185,7 +180,6 @@ static void *thr_worker(void *_count)
        unsigned long long *count = _count;
        unsigned int counter = 0;
        struct urcu_worker worker;
-       int blocking = 1;
 
        printf_verbose("thread_begin %s, tid %lu\n",
                        "worker", urcu_get_thread_id());
@@ -203,9 +197,11 @@ static void *thr_worker(void *_count)
        cmm_smp_mb();
 
        for (;;) {
-               int batch_work_count = 0;
+               enum urcu_accept_ret ret;
 
-               urcu_accept_work(&workqueue, &worker, blocking);
+               ret = urcu_accept_work(&workqueue, &worker);
+               if (ret == URCU_ACCEPT_SHUTDOWN)
+                       break;
                for (;;) {
                        struct urcu_work *work;
                        struct test_work *t;
@@ -215,17 +211,11 @@ static void *thr_worker(void *_count)
                                break;
                        t = caa_container_of(work, struct test_work, w);
                        printf_verbose("dequeue work %p\n", t);
-                       batch_work_count++;
                        URCU_TLS(nr_dequeues)++;
                        if (caa_unlikely(work_loops))
                                loop_sleep(work_loops);
                        free(t);
                }
-               if (!test_duration_dequeue())
-                       blocking = 0;
-               if (caa_unlikely(!test_duration_dequeue()
-                               && !batch_work_count))
-                       break;
        }
 end:
        urcu_worker_unregister(&workqueue, &worker);
@@ -373,10 +363,7 @@ int main(int argc, char **argv)
                        sleep(1);
                }
        }
-       test_stop_dequeue = 1;
-
-       /* Send finish to all workers */
-       urcu_workqueue_wakeup_all(&workqueue);
+       urcu_workqueue_shutdown(&workqueue);
 
        for (i = 0; i < nr_dispatchers; i++) {
                err = pthread_join(tid_dispatcher[i], &tret);
index 0c85ca77c6cf6cc5c146e6bc5d7db355c5ea4548..6256bff8218c87673fdfa865b39c4ec87ae6da83 100644 (file)
 #include <pthread.h>
 #include <assert.h>
 
+enum urcu_accept_ret {
+       URCU_ACCEPT_WORK        = 0,
+       URCU_ACCEPT_SHUTDOWN    = 1,
+};
+
 /*
  * We use RCU to steal work from siblings. Therefore, one of RCU flavors
  * need to be included before this header. All worker that participate
@@ -53,6 +58,8 @@ struct urcu_workqueue {
        /* RCU linked list head of siblings for work stealing. */
        struct cds_list_head sibling_head;
        pthread_mutex_t sibling_lock;   /* Protect sibling list updates */
+
+       bool shutdown;                  /* Shutdown performed */
 };
 
 struct urcu_worker {
@@ -75,6 +82,7 @@ void urcu_workqueue_init(struct urcu_workqueue *queue)
        __cds_wfcq_init(&queue->head, &queue->tail);
        urcu_wait_queue_init(&queue->waitqueue);
        CDS_INIT_LIST_HEAD(&queue->sibling_head);
+       queue->shutdown = false;
 }
 
 static inline
@@ -108,7 +116,7 @@ void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
 }
 
 static inline
-void urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
+void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
 {
        struct urcu_waiters waiters;
 
@@ -284,9 +292,8 @@ end:
 }
 
 static inline
-void urcu_accept_work(struct urcu_workqueue *queue,
-               struct urcu_worker *worker,
-               int blocking)
+enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue,
+               struct urcu_worker *worker)
 {
        enum cds_wfcq_ret wfcq_ret;
 
@@ -302,8 +309,9 @@ void urcu_accept_work(struct urcu_workqueue *queue,
        /* Try to steal work from sibling instead of blocking */
        if (__urcu_steal_work(queue, worker))
                goto do_work;
-       if (!blocking)
-               return;
+       /* No more work to do, check shutdown state */
+       if (CMM_LOAD_SHARED(queue->shutdown))
+               return URCU_ACCEPT_SHUTDOWN;
        urcu_wait_set_state(&worker->wait_node,
                        URCU_WAIT_WAITING);
        if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) {
@@ -353,6 +361,7 @@ do_work:
         * they can steal from us.
         */
        (void) __urcu_wakeup_siblings(queue, worker);
+       return URCU_ACCEPT_WORK;
 }
 
 static inline
@@ -382,4 +391,13 @@ struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
        return caa_container_of(node, struct urcu_work, node);
 }
 
+static inline
+void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
+{
+       /* Set shutdown */
+       CMM_STORE_SHARED(queue->shutdown, true);
+       /* Wakeup all workers */
+       __urcu_workqueue_wakeup_all(queue);
+}
+
 #endif /* _URCU_WORKQUEUE_FIFO_H */
This page took 0.02775 seconds and 4 git commands to generate.