workqueue: move work stealing flag to workqueue
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 23 Oct 2014 22:34:56 +0000 (18:34 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 23 Oct 2014 22:34:56 +0000 (18:34 -0400)
Ensure all workers using the same queue use the same flags.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
tests/benchmark/test_urcu_workqueue.c
urcu/workqueue-fifo.h

index 4c070861530c08ca2028cd0059abad5144c12d2f..0074de05bbe967653e5b68d7ac548a5129e56cbe 100644 (file)
@@ -62,6 +62,8 @@ static unsigned long dispatch_delay_loops;
 
 static unsigned long max_queue_len;
 
+static int test_steal;
+
 static inline void loop_sleep(unsigned long loops)
 {
        while (loops-- != 0)
@@ -198,8 +200,7 @@ static void *thr_worker(void *_count)
        set_affinity();
 
        rcu_register_thread();
-       urcu_worker_init(&workqueue, &worker, URCU_WORKER_STEAL);
-       //urcu_worker_init(&workqueue, &worker, 0);
+       urcu_worker_init(&workqueue, &worker);
        urcu_worker_register(&workqueue, &worker);
 
        while (!test_go)
@@ -251,6 +252,7 @@ static void show_usage(int argc, char **argv)
        printf("        [-a cpu#] [-a cpu#]... (affinity)\n");
        printf("        [-w] Wait for worker to empty stack\n");
        printf("        [-m len] (Max queue length. 0 means infinite.))\n");
+       printf("        [-s] (Enable work-stealing between workers.))\n");
        printf("\n");
 }
 
@@ -263,6 +265,7 @@ int main(int argc, char **argv)
        unsigned long long tot_enqueues = 0, tot_dequeues = 0;
        unsigned long long end_dequeues = 0;
        int i, a, retval = 0;
+       int worker_flags = 0;
 
        if (argc < 4) {
                show_usage(argc, argv);
@@ -328,6 +331,9 @@ int main(int argc, char **argv)
                case 'w':
                        test_wait_empty = 1;
                        break;
+               case 's':
+                       test_steal = 1;
+                       break;
                }
        }
 
@@ -345,7 +351,9 @@ int main(int argc, char **argv)
        tid_worker = calloc(nr_workers, sizeof(*tid_worker));
        count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher));
        count_worker = calloc(nr_workers, sizeof(*count_worker));
-       urcu_workqueue_init(&workqueue, max_queue_len);
+       if (test_steal)
+               worker_flags |= URCU_WORKER_STEAL;
+       urcu_workqueue_init(&workqueue, max_queue_len, worker_flags);
 
        next_aff = 0;
 
@@ -399,9 +407,11 @@ int main(int argc, char **argv)
 
        printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
                "work_loops %lu nr_workers %3u "
-               "nr_enqueues %12llu nr_dequeues %12llu max_queue_len %lu\n",
+               "nr_enqueues %12llu nr_dequeues %12llu max_queue_len %lu "
+               "work_stealing %s\n",
                argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
-               nr_workers, tot_enqueues, tot_dequeues, max_queue_len);
+               nr_workers, tot_enqueues, tot_dequeues, max_queue_len,
+               test_steal ? "enabled" : "disabled");
        free(count_dispatcher);
        free(count_worker);
        free(tid_dispatcher);
index 13d9278a5beca9baa46ea9ef3cc7f3b1981091e0..458d97663ff599b3163c797c7b239cfc79dcdf07 100644 (file)
@@ -67,6 +67,8 @@ struct urcu_workqueue {
        /* Maximum number of work entries (approximate). 0 means infinite. */
        unsigned long nr_work_max;
        unsigned long nr_work;          /* Current number of work items */
+
+       int worker_flags;               /* Worker flags */
        bool shutdown;                  /* Shutdown performed */
 };
 
@@ -91,7 +93,8 @@ enum urcu_worker_flags {
 
 static inline
 void urcu_workqueue_init(struct urcu_workqueue *queue,
-               unsigned long max_queue_len)
+               unsigned long max_queue_len,
+               int worker_flags)
 {
        __cds_wfcq_init(&queue->head, &queue->tail);
        urcu_wait_queue_init(&queue->waitqueue);
@@ -156,14 +159,14 @@ void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
 
 static inline
 void urcu_worker_init(struct urcu_workqueue *queue,
-               struct urcu_worker *worker, int flags)
+               struct urcu_worker *worker)
 {
        cds_wfcq_init(&worker->head, &worker->tail);
-       worker->flags = flags;
        urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
        worker->own = NULL;
        worker->wait_node.node.next = NULL;
        worker->queue = queue;
+       worker->flags = queue->worker_flags;
 }
 
 static inline
This page took 0.026915 seconds and 4 git commands to generate.