workqueue: handle congestion by clearing queue
[urcu.git] / tests / benchmark / test_urcu_workqueue.c
index 4c070861530c08ca2028cd0059abad5144c12d2f..9d45651ea2827148b4fcb795780159efda4d648e 100644 (file)
@@ -62,6 +62,8 @@ static unsigned long dispatch_delay_loops;
 
 static unsigned long max_queue_len;
 
+static int test_steal;
+
 static inline void loop_sleep(unsigned long loops)
 {
        while (loops-- != 0)
@@ -126,8 +128,9 @@ static int test_duration_enqueue(void)
        return !test_stop_enqueue;
 }
 
-static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
-static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
+static DEFINE_URCU_TLS(unsigned long long, nr_work_done);
+static DEFINE_URCU_TLS(unsigned long long, nr_incoming);
+static DEFINE_URCU_TLS(unsigned long long, nr_discard);
 
 static unsigned int nr_dispatchers;
 static unsigned int nr_workers;
@@ -138,6 +141,28 @@ struct test_work {
        struct urcu_work w;
 };
 
+static
+void discard_queue(struct urcu_workqueue *queue)
+{
+       struct urcu_worker dummy_worker;
+
+       urcu_worker_init(queue, &dummy_worker);
+       if (!urcu_workqueue_steal_all(queue, &dummy_worker))
+               return;
+       for (;;) {
+               struct urcu_work *work;
+               struct test_work *t;
+
+               work = urcu_dequeue_work(&dummy_worker);
+               if (!work)
+                       break;
+               t = caa_container_of(work, struct test_work, w);
+               printf_verbose("discard work %p\n", t);
+               URCU_TLS(nr_discard)++;
+               free(t);
+       }
+}
+
 static void *thr_dispatcher(void *_count)
 {
        unsigned long long *count = _count;
@@ -159,16 +184,18 @@ static void *thr_dispatcher(void *_count)
 
                if (!work)
                        goto fail;
-retry:
-               printf_verbose("attempt queue work %p\n", work);
+               printf_verbose("incoming work %p\n", work);
+               URCU_TLS(nr_incoming)++;
                ret = urcu_queue_work(&workqueue, &work->w);
                if (ret == URCU_ENQUEUE_FULL) {
                        printf_verbose("queue work %p (queue full)\n", work);
-                       (void) poll(NULL, 0, 10);
-                       goto retry;
+                       printf_verbose("discard work %p\n", work);
+                       URCU_TLS(nr_discard)++;
+                       free(work);
+                       discard_queue(&workqueue);
+                       continue;
                }
                printf_verbose("queue work %p (ok)\n", work);
-               URCU_TLS(nr_enqueues)++;
 
                if (caa_unlikely(dispatch_delay_loops))
                        loop_sleep(dispatch_delay_loops);
@@ -178,11 +205,13 @@ fail:
        }
 
        uatomic_inc(&test_enqueue_stopped);
-       count[0] = URCU_TLS(nr_enqueues);
+       count[0] = URCU_TLS(nr_incoming);
+       count[1] = URCU_TLS(nr_discard);
        printf_verbose("dispatcher thread_end, tid %lu, "
-                       "enqueues %llu\n",
+                       "incoming %llu discard %llu\n",
                        urcu_get_thread_id(),
-                       URCU_TLS(nr_enqueues));
+                       URCU_TLS(nr_incoming),
+                       URCU_TLS(nr_discard));
        return ((void*)1);
 }
 
@@ -198,8 +227,7 @@ static void *thr_worker(void *_count)
        set_affinity();
 
        rcu_register_thread();
-       urcu_worker_init(&workqueue, &worker, URCU_WORKER_STEAL);
-       //urcu_worker_init(&workqueue, &worker, 0);
+       urcu_worker_init(&workqueue, &worker);
        urcu_worker_register(&workqueue, &worker);
 
        while (!test_go)
@@ -222,7 +250,7 @@ static void *thr_worker(void *_count)
                                break;
                        t = caa_container_of(work, struct test_work, w);
                        printf_verbose("dequeue work %p\n", t);
-                       URCU_TLS(nr_dequeues)++;
+                       URCU_TLS(nr_work_done)++;
                        if (caa_unlikely(work_loops))
                                loop_sleep(work_loops);
                        free(t);
@@ -235,8 +263,8 @@ end:
        printf_verbose("worker thread_end, tid %lu, "
                        "dequeues %llu\n",
                        urcu_get_thread_id(),
-                       URCU_TLS(nr_dequeues));
-       count[0] = URCU_TLS(nr_dequeues);
+                       URCU_TLS(nr_work_done));
+       count[0] = URCU_TLS(nr_work_done);
        return ((void*)2);
 }
 
@@ -251,6 +279,7 @@ static void show_usage(int argc, char **argv)
        printf("        [-a cpu#] [-a cpu#]... (affinity)\n");
        printf("        [-w] Wait for worker to empty stack\n");
        printf("        [-m len] (Max queue length. 0 means infinite.))\n");
+       printf("        [-s] (Enable work-stealing between workers.))\n");
        printf("\n");
 }
 
@@ -260,9 +289,9 @@ int main(int argc, char **argv)
        pthread_t *tid_dispatcher, *tid_worker;
        void *tret;
        unsigned long long *count_dispatcher, *count_worker;
-       unsigned long long tot_enqueues = 0, tot_dequeues = 0;
-       unsigned long long end_dequeues = 0;
+       unsigned long long tot_incoming = 0, tot_work_done = 0, tot_discard = 0;
        int i, a, retval = 0;
+       int worker_flags = 0;
 
        if (argc < 4) {
                show_usage(argc, argv);
@@ -328,6 +357,9 @@ int main(int argc, char **argv)
                case 'w':
                        test_wait_empty = 1;
                        break;
+               case 's':
+                       test_steal = 1;
+                       break;
                }
        }
 
@@ -343,15 +375,18 @@ int main(int argc, char **argv)
 
        tid_dispatcher = calloc(nr_dispatchers, sizeof(*tid_dispatcher));
        tid_worker = calloc(nr_workers, sizeof(*tid_worker));
-       count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher));
+       count_dispatcher = calloc(nr_dispatchers,
+                               2 * sizeof(*count_dispatcher));
        count_worker = calloc(nr_workers, sizeof(*count_worker));
-       urcu_workqueue_init(&workqueue, max_queue_len);
+       if (test_steal)
+               worker_flags |= URCU_WORKER_STEAL;
+       urcu_workqueue_init(&workqueue, max_queue_len, worker_flags);
 
        next_aff = 0;
 
        for (i = 0; i < nr_dispatchers; i++) {
                err = pthread_create(&tid_dispatcher[i], NULL, thr_dispatcher,
-                                    &count_dispatcher[i]);
+                                    &count_dispatcher[2 * i]);
                if (err != 0)
                        exit(1);
        }
@@ -388,20 +423,27 @@ int main(int argc, char **argv)
                err = pthread_join(tid_dispatcher[i], &tret);
                if (err != 0)
                        exit(1);
-               tot_enqueues += count_dispatcher[i];
+               tot_incoming += count_dispatcher[2 * i];
+               tot_discard += count_dispatcher[(2 * i) + 1];
        }
        for (i = 0; i < nr_workers; i++) {
                err = pthread_join(tid_worker[i], &tret);
                if (err != 0)
                        exit(1);
-               tot_dequeues += count_worker[i];
+               tot_work_done += count_worker[i];
        }
 
        printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
                "work_loops %lu nr_workers %3u "
-               "nr_enqueues %12llu nr_dequeues %12llu max_queue_len %lu\n",
+               "nr_incoming %12llu nr_work_done %12llu nr_discard %12llu "
+               "max_queue_len %lu work_stealing %s\n",
                argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
-               nr_workers, tot_enqueues, tot_dequeues, max_queue_len);
+               nr_workers, tot_incoming, tot_work_done, tot_discard,
+               max_queue_len, test_steal ? "enabled" : "disabled");
+       if (nr_incoming != nr_work_done + nr_discard) {
+               printf("ERROR: nr_incoming does not match sum of work done and discard.\n");
+               retval = -1;
+       }
        free(count_dispatcher);
        free(count_worker);
        free(tid_dispatcher);
This page took 0.025931 seconds and 4 git commands to generate.