workqueue: handle congestion by clearing queue
[urcu.git] / tests / benchmark / test_urcu_workqueue.c
index 9ac235067030a98b995e186d59929268fcb51703..9d45651ea2827148b4fcb795780159efda4d648e 100644 (file)
@@ -52,7 +52,7 @@
 #include <urcu/wfstack.h>
 #include <urcu/workqueue-fifo.h>
 
-static volatile int test_go, test_stop_enqueue, test_stop_dequeue;
+static volatile int test_go, test_stop_enqueue;
 
 static unsigned long work_loops;
 
@@ -60,6 +60,10 @@ static unsigned long duration;
 
 static unsigned long dispatch_delay_loops;
 
+static unsigned long max_queue_len;
+
+static int test_steal;
+
 static inline void loop_sleep(unsigned long loops)
 {
        while (loops-- != 0)
@@ -119,18 +123,14 @@ static void set_affinity(void)
 /*
  * returns 0 if test should end.
  */
-static int test_duration_dequeue(void)
-{
-       return !test_stop_dequeue;
-}
-
 static int test_duration_enqueue(void)
 {
        return !test_stop_enqueue;
 }
 
-static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
-static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
+static DEFINE_URCU_TLS(unsigned long long, nr_work_done);
+static DEFINE_URCU_TLS(unsigned long long, nr_incoming);
+static DEFINE_URCU_TLS(unsigned long long, nr_discard);
 
 static unsigned int nr_dispatchers;
 static unsigned int nr_workers;
@@ -141,6 +141,28 @@ struct test_work {
        struct urcu_work w;
 };
 
+static
+void discard_queue(struct urcu_workqueue *queue)
+{
+       struct urcu_worker dummy_worker;
+
+       urcu_worker_init(queue, &dummy_worker);
+       if (!urcu_workqueue_steal_all(queue, &dummy_worker))
+               return;
+       for (;;) {
+               struct urcu_work *work;
+               struct test_work *t;
+
+               work = urcu_dequeue_work(&dummy_worker);
+               if (!work)
+                       break;
+               t = caa_container_of(work, struct test_work, w);
+               printf_verbose("discard work %p\n", t);
+               URCU_TLS(nr_discard)++;
+               free(t);
+       }
+}
+
 static void *thr_dispatcher(void *_count)
 {
        unsigned long long *count = _count;
@@ -158,11 +180,22 @@ static void *thr_dispatcher(void *_count)
 
        for (;;) {
                struct test_work *work = malloc(sizeof(*work));
+               enum urcu_enqueue_ret ret;
+
                if (!work)
                        goto fail;
-               printf_verbose("queue work %p\n", work);
-               urcu_queue_work(&workqueue, &work->w);
-               URCU_TLS(nr_enqueues)++;
+               printf_verbose("incoming work %p\n", work);
+               URCU_TLS(nr_incoming)++;
+               ret = urcu_queue_work(&workqueue, &work->w);
+               if (ret == URCU_ENQUEUE_FULL) {
+                       printf_verbose("queue work %p (queue full)\n", work);
+                       printf_verbose("discard work %p\n", work);
+                       URCU_TLS(nr_discard)++;
+                       free(work);
+                       discard_queue(&workqueue);
+                       continue;
+               }
+               printf_verbose("queue work %p (ok)\n", work);
 
                if (caa_unlikely(dispatch_delay_loops))
                        loop_sleep(dispatch_delay_loops);
@@ -172,11 +205,13 @@ fail:
        }
 
        uatomic_inc(&test_enqueue_stopped);
-       count[0] = URCU_TLS(nr_enqueues);
+       count[0] = URCU_TLS(nr_incoming);
+       count[1] = URCU_TLS(nr_discard);
        printf_verbose("dispatcher thread_end, tid %lu, "
-                       "enqueues %llu\n",
+                       "incoming %llu discard %llu\n",
                        urcu_get_thread_id(),
-                       URCU_TLS(nr_enqueues));
+                       URCU_TLS(nr_incoming),
+                       URCU_TLS(nr_discard));
        return ((void*)1);
 }
 
@@ -185,7 +220,6 @@ static void *thr_worker(void *_count)
        unsigned long long *count = _count;
        unsigned int counter = 0;
        struct urcu_worker worker;
-       int blocking = 1;
 
        printf_verbose("thread_begin %s, tid %lu\n",
                        "worker", urcu_get_thread_id());
@@ -193,8 +227,7 @@ static void *thr_worker(void *_count)
        set_affinity();
 
        rcu_register_thread();
-       urcu_worker_init(&worker, URCU_WORKER_STEAL);
-       //urcu_worker_init(&worker, 0);
+       urcu_worker_init(&workqueue, &worker);
        urcu_worker_register(&workqueue, &worker);
 
        while (!test_go)
@@ -203,9 +236,11 @@ static void *thr_worker(void *_count)
        cmm_smp_mb();
 
        for (;;) {
-               int batch_work_count = 0;
+               enum urcu_accept_ret ret;
 
-               urcu_accept_work(&workqueue, &worker, blocking);
+               ret = urcu_accept_work(&worker);
+               if (ret == URCU_ACCEPT_SHUTDOWN)
+                       break;
                for (;;) {
                        struct urcu_work *work;
                        struct test_work *t;
@@ -215,17 +250,11 @@ static void *thr_worker(void *_count)
                                break;
                        t = caa_container_of(work, struct test_work, w);
                        printf_verbose("dequeue work %p\n", t);
-                       batch_work_count++;
-                       URCU_TLS(nr_dequeues)++;
+                       URCU_TLS(nr_work_done)++;
                        if (caa_unlikely(work_loops))
                                loop_sleep(work_loops);
                        free(t);
                }
-               if (!test_duration_dequeue())
-                       blocking = 0;
-               if (caa_unlikely(!test_duration_dequeue()
-                               && !batch_work_count))
-                       break;
        }
 end:
        urcu_worker_unregister(&workqueue, &worker);
@@ -234,8 +263,8 @@ end:
        printf_verbose("worker thread_end, tid %lu, "
                        "dequeues %llu\n",
                        urcu_get_thread_id(),
-                       URCU_TLS(nr_dequeues));
-       count[0] = URCU_TLS(nr_dequeues);
+                       URCU_TLS(nr_work_done));
+       count[0] = URCU_TLS(nr_work_done);
        return ((void*)2);
 }
 
@@ -249,6 +278,8 @@ static void show_usage(int argc, char **argv)
        printf("        [-v] (verbose output)\n");
        printf("        [-a cpu#] [-a cpu#]... (affinity)\n");
        printf("        [-w] Wait for worker to empty stack\n");
+       printf("        [-m len] (Max queue length. 0 means infinite.))\n");
+       printf("        [-s] (Enable work-stealing between workers.))\n");
        printf("\n");
 }
 
@@ -258,9 +289,9 @@ int main(int argc, char **argv)
        pthread_t *tid_dispatcher, *tid_worker;
        void *tret;
        unsigned long long *count_dispatcher, *count_worker;
-       unsigned long long tot_enqueues = 0, tot_dequeues = 0;
-       unsigned long long end_dequeues = 0;
+       unsigned long long tot_incoming = 0, tot_work_done = 0, tot_discard = 0;
        int i, a, retval = 0;
+       int worker_flags = 0;
 
        if (argc < 4) {
                show_usage(argc, argv);
@@ -299,6 +330,13 @@ int main(int argc, char **argv)
                        use_affinity = 1;
                        printf_verbose("Adding CPU %d affinity\n", a);
                        break;
+               case 'm':
+                       if (argc < i + 2) {
+                               show_usage(argc, argv);
+                               return -1;
+                       }
+                       max_queue_len = atol(argv[++i]);
+                       break;
                case 'c':
                        if (argc < i + 2) {
                                show_usage(argc, argv);
@@ -319,6 +357,9 @@ int main(int argc, char **argv)
                case 'w':
                        test_wait_empty = 1;
                        break;
+               case 's':
+                       test_steal = 1;
+                       break;
                }
        }
 
@@ -334,15 +375,18 @@ int main(int argc, char **argv)
 
        tid_dispatcher = calloc(nr_dispatchers, sizeof(*tid_dispatcher));
        tid_worker = calloc(nr_workers, sizeof(*tid_worker));
-       count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher));
+       count_dispatcher = calloc(nr_dispatchers,
+                               2 * sizeof(*count_dispatcher));
        count_worker = calloc(nr_workers, sizeof(*count_worker));
-       urcu_workqueue_init(&workqueue);
+       if (test_steal)
+               worker_flags |= URCU_WORKER_STEAL;
+       urcu_workqueue_init(&workqueue, max_queue_len, worker_flags);
 
        next_aff = 0;
 
        for (i = 0; i < nr_dispatchers; i++) {
                err = pthread_create(&tid_dispatcher[i], NULL, thr_dispatcher,
-                                    &count_dispatcher[i]);
+                                    &count_dispatcher[2 * i]);
                if (err != 0)
                        exit(1);
        }
@@ -373,29 +417,33 @@ int main(int argc, char **argv)
                        sleep(1);
                }
        }
-       test_stop_dequeue = 1;
-
-       /* Send finish to all workers */
-       urcu_workqueue_wakeup_all(&workqueue);
+       urcu_workqueue_shutdown(&workqueue);
 
        for (i = 0; i < nr_dispatchers; i++) {
                err = pthread_join(tid_dispatcher[i], &tret);
                if (err != 0)
                        exit(1);
-               tot_enqueues += count_dispatcher[i];
+               tot_incoming += count_dispatcher[2 * i];
+               tot_discard += count_dispatcher[(2 * i) + 1];
        }
        for (i = 0; i < nr_workers; i++) {
                err = pthread_join(tid_worker[i], &tret);
                if (err != 0)
                        exit(1);
-               tot_dequeues += count_worker[i];
+               tot_work_done += count_worker[i];
        }
 
        printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
                "work_loops %lu nr_workers %3u "
-               "nr_enqueues %12llu nr_dequeues %12llu\n",
+               "nr_incoming %12llu nr_work_done %12llu nr_discard %12llu "
+               "max_queue_len %lu work_stealing %s\n",
                argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
-               nr_workers, tot_enqueues, tot_dequeues);
+               nr_workers, tot_incoming, tot_work_done, tot_discard,
+               max_queue_len, test_steal ? "enabled" : "disabled");
+       if (nr_incoming != nr_work_done + nr_discard) {
+               printf("ERROR: nr_incoming does not match sum of work done and discard.\n");
+               retval = -1;
+       }
        free(count_dispatcher);
        free(count_worker);
        free(tid_dispatcher);
This page took 0.034951 seconds and 4 git commands to generate.