workqueue: handle congestion by clearing queue
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 23 Oct 2014 23:21:50 +0000 (19:21 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 23 Oct 2014 23:21:50 +0000 (19:21 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
tests/benchmark/test_urcu_workqueue.c
urcu/workqueue-fifo.h

index 0074de05bbe967653e5b68d7ac548a5129e56cbe..9d45651ea2827148b4fcb795780159efda4d648e 100644 (file)
@@ -128,8 +128,9 @@ static int test_duration_enqueue(void)
        return !test_stop_enqueue;
 }
 
-static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
-static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
+static DEFINE_URCU_TLS(unsigned long long, nr_work_done);
+static DEFINE_URCU_TLS(unsigned long long, nr_incoming);
+static DEFINE_URCU_TLS(unsigned long long, nr_discard);
 
 static unsigned int nr_dispatchers;
 static unsigned int nr_workers;
@@ -140,6 +141,28 @@ struct test_work {
        struct urcu_work w;
 };
 
+static
+void discard_queue(struct urcu_workqueue *queue)
+{
+       struct urcu_worker dummy_worker;
+
+       urcu_worker_init(queue, &dummy_worker);
+       if (!urcu_workqueue_steal_all(queue, &dummy_worker))
+               return;
+       for (;;) {
+               struct urcu_work *work;
+               struct test_work *t;
+
+               work = urcu_dequeue_work(&dummy_worker);
+               if (!work)
+                       break;
+               t = caa_container_of(work, struct test_work, w);
+               printf_verbose("discard work %p\n", t);
+               URCU_TLS(nr_discard)++;
+               free(t);
+       }
+}
+
 static void *thr_dispatcher(void *_count)
 {
        unsigned long long *count = _count;
@@ -161,16 +184,18 @@ static void *thr_dispatcher(void *_count)
 
                if (!work)
                        goto fail;
-retry:
-               printf_verbose("attempt queue work %p\n", work);
+               printf_verbose("incoming work %p\n", work);
+               URCU_TLS(nr_incoming)++;
                ret = urcu_queue_work(&workqueue, &work->w);
                if (ret == URCU_ENQUEUE_FULL) {
                        printf_verbose("queue work %p (queue full)\n", work);
-                       (void) poll(NULL, 0, 10);
-                       goto retry;
+                       printf_verbose("discard work %p\n", work);
+                       URCU_TLS(nr_discard)++;
+                       free(work);
+                       discard_queue(&workqueue);
+                       continue;
                }
                printf_verbose("queue work %p (ok)\n", work);
-               URCU_TLS(nr_enqueues)++;
 
                if (caa_unlikely(dispatch_delay_loops))
                        loop_sleep(dispatch_delay_loops);
@@ -180,11 +205,13 @@ fail:
        }
 
        uatomic_inc(&test_enqueue_stopped);
-       count[0] = URCU_TLS(nr_enqueues);
+       count[0] = URCU_TLS(nr_incoming);
+       count[1] = URCU_TLS(nr_discard);
        printf_verbose("dispatcher thread_end, tid %lu, "
-                       "enqueues %llu\n",
+                       "incoming %llu discard %llu\n",
                        urcu_get_thread_id(),
-                       URCU_TLS(nr_enqueues));
+                       URCU_TLS(nr_incoming),
+                       URCU_TLS(nr_discard));
        return ((void*)1);
 }
 
@@ -223,7 +250,7 @@ static void *thr_worker(void *_count)
                                break;
                        t = caa_container_of(work, struct test_work, w);
                        printf_verbose("dequeue work %p\n", t);
-                       URCU_TLS(nr_dequeues)++;
+                       URCU_TLS(nr_work_done)++;
                        if (caa_unlikely(work_loops))
                                loop_sleep(work_loops);
                        free(t);
@@ -236,8 +263,8 @@ end:
        printf_verbose("worker thread_end, tid %lu, "
                        "dequeues %llu\n",
                        urcu_get_thread_id(),
-                       URCU_TLS(nr_dequeues));
-       count[0] = URCU_TLS(nr_dequeues);
+                       URCU_TLS(nr_work_done));
+       count[0] = URCU_TLS(nr_work_done);
        return ((void*)2);
 }
 
@@ -262,8 +289,7 @@ int main(int argc, char **argv)
        pthread_t *tid_dispatcher, *tid_worker;
        void *tret;
        unsigned long long *count_dispatcher, *count_worker;
-       unsigned long long tot_enqueues = 0, tot_dequeues = 0;
-       unsigned long long end_dequeues = 0;
+       unsigned long long tot_incoming = 0, tot_work_done = 0, tot_discard = 0;
        int i, a, retval = 0;
        int worker_flags = 0;
 
@@ -349,7 +375,8 @@ int main(int argc, char **argv)
 
        tid_dispatcher = calloc(nr_dispatchers, sizeof(*tid_dispatcher));
        tid_worker = calloc(nr_workers, sizeof(*tid_worker));
-       count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher));
+       count_dispatcher = calloc(nr_dispatchers,
+                               2 * sizeof(*count_dispatcher));
        count_worker = calloc(nr_workers, sizeof(*count_worker));
        if (test_steal)
                worker_flags |= URCU_WORKER_STEAL;
@@ -359,7 +386,7 @@ int main(int argc, char **argv)
 
        for (i = 0; i < nr_dispatchers; i++) {
                err = pthread_create(&tid_dispatcher[i], NULL, thr_dispatcher,
-                                    &count_dispatcher[i]);
+                                    &count_dispatcher[2 * i]);
                if (err != 0)
                        exit(1);
        }
@@ -396,22 +423,27 @@ int main(int argc, char **argv)
                err = pthread_join(tid_dispatcher[i], &tret);
                if (err != 0)
                        exit(1);
-               tot_enqueues += count_dispatcher[i];
+               tot_incoming += count_dispatcher[2 * i];
+               tot_discard += count_dispatcher[(2 * i) + 1];
        }
        for (i = 0; i < nr_workers; i++) {
                err = pthread_join(tid_worker[i], &tret);
                if (err != 0)
                        exit(1);
-               tot_dequeues += count_worker[i];
+               tot_work_done += count_worker[i];
        }
 
        printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
                "work_loops %lu nr_workers %3u "
-               "nr_enqueues %12llu nr_dequeues %12llu max_queue_len %lu "
-               "work_stealing %s\n",
+               "nr_incoming %12llu nr_work_done %12llu nr_discard %12llu "
+               "max_queue_len %lu work_stealing %s\n",
                argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
-               nr_workers, tot_enqueues, tot_dequeues, max_queue_len,
-               test_steal ? "enabled" : "disabled");
+               nr_workers, tot_incoming, tot_work_done, tot_discard,
+               max_queue_len, test_steal ? "enabled" : "disabled");
+       if (nr_incoming != nr_work_done + nr_discard) {
+               printf("ERROR: nr_incoming does not match sum of work done and discard.\n");
+               retval = -1;
+       }
        free(count_dispatcher);
        free(count_worker);
        free(tid_dispatcher);
index 458d97663ff599b3163c797c7b239cfc79dcdf07..ff0c650571a175ae6c809eee41d9f3135131ce36 100644 (file)
@@ -502,4 +502,29 @@ void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
        __urcu_workqueue_wakeup_all(queue);
 }
 
+/*
+ * Use to let dispatcher steal work from the entire queue in case of
+ * stall. The "worker" parameter need to be intialized, but is usually
+ * not registered.
+ */
+static inline
+bool urcu_workqueue_steal_all(struct urcu_workqueue *queue,
+               struct urcu_worker *worker)
+{
+       struct urcu_worker *sibling;
+       bool has_work = false;
+
+       rcu_read_lock();
+       /* Steal from each worker */
+       cds_list_for_each_entry_rcu(sibling, &queue->sibling_head,
+                       sibling_node)
+               has_work |= ___urcu_grab_work(worker, &sibling->head,
+                                       &sibling->tail, 1);
+       rcu_read_unlock();
+
+       /* Steal from global workqueue */
+       has_work |= ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
+       return has_work;
+}
+
 #endif /* _URCU_WORKQUEUE_FIFO_H */
This page took 0.028375 seconds and 4 git commands to generate.