From: Mathieu Desnoyers Date: Thu, 23 Oct 2014 23:21:50 +0000 (-0400) Subject: workqueue: handle congestion by clearing queue X-Git-Url: http://git.liburcu.org/?p=userspace-rcu.git;a=commitdiff_plain;h=f00ae1883551dd4150891251559c3d9a4a57eebe workqueue: handle congestion by clearing queue Signed-off-by: Mathieu Desnoyers --- diff --git a/tests/benchmark/test_urcu_workqueue.c b/tests/benchmark/test_urcu_workqueue.c index 0074de0..9d45651 100644 --- a/tests/benchmark/test_urcu_workqueue.c +++ b/tests/benchmark/test_urcu_workqueue.c @@ -128,8 +128,9 @@ static int test_duration_enqueue(void) return !test_stop_enqueue; } -static DEFINE_URCU_TLS(unsigned long long, nr_dequeues); -static DEFINE_URCU_TLS(unsigned long long, nr_enqueues); +static DEFINE_URCU_TLS(unsigned long long, nr_work_done); +static DEFINE_URCU_TLS(unsigned long long, nr_incoming); +static DEFINE_URCU_TLS(unsigned long long, nr_discard); static unsigned int nr_dispatchers; static unsigned int nr_workers; @@ -140,6 +141,28 @@ struct test_work { struct urcu_work w; }; +static +void discard_queue(struct urcu_workqueue *queue) +{ + struct urcu_worker dummy_worker; + + urcu_worker_init(queue, &dummy_worker); + if (!urcu_workqueue_steal_all(queue, &dummy_worker)) + return; + for (;;) { + struct urcu_work *work; + struct test_work *t; + + work = urcu_dequeue_work(&dummy_worker); + if (!work) + break; + t = caa_container_of(work, struct test_work, w); + printf_verbose("discard work %p\n", t); + URCU_TLS(nr_discard)++; + free(t); + } +} + static void *thr_dispatcher(void *_count) { unsigned long long *count = _count; @@ -161,16 +184,18 @@ static void *thr_dispatcher(void *_count) if (!work) goto fail; -retry: - printf_verbose("attempt queue work %p\n", work); + printf_verbose("incoming work %p\n", work); + URCU_TLS(nr_incoming)++; ret = urcu_queue_work(&workqueue, &work->w); if (ret == URCU_ENQUEUE_FULL) { printf_verbose("queue work %p (queue full)\n", work); - (void) poll(NULL, 0, 10); - goto retry; + printf_verbose("discard work %p\n", work); + URCU_TLS(nr_discard)++; + free(work); + discard_queue(&workqueue); + continue; } printf_verbose("queue work %p (ok)\n", work); - URCU_TLS(nr_enqueues)++; if (caa_unlikely(dispatch_delay_loops)) loop_sleep(dispatch_delay_loops); @@ -180,11 +205,13 @@ fail: } uatomic_inc(&test_enqueue_stopped); - count[0] = URCU_TLS(nr_enqueues); + count[0] = URCU_TLS(nr_incoming); + count[1] = URCU_TLS(nr_discard); printf_verbose("dispatcher thread_end, tid %lu, " - "enqueues %llu\n", + "incoming %llu discard %llu\n", urcu_get_thread_id(), - URCU_TLS(nr_enqueues)); + URCU_TLS(nr_incoming), + URCU_TLS(nr_discard)); return ((void*)1); } @@ -223,7 +250,7 @@ static void *thr_worker(void *_count) break; t = caa_container_of(work, struct test_work, w); printf_verbose("dequeue work %p\n", t); - URCU_TLS(nr_dequeues)++; + URCU_TLS(nr_work_done)++; if (caa_unlikely(work_loops)) loop_sleep(work_loops); free(t); @@ -236,8 +263,8 @@ end: printf_verbose("worker thread_end, tid %lu, " "dequeues %llu\n", urcu_get_thread_id(), - URCU_TLS(nr_dequeues)); - count[0] = URCU_TLS(nr_dequeues); + URCU_TLS(nr_work_done)); + count[0] = URCU_TLS(nr_work_done); return ((void*)2); } @@ -262,8 +289,7 @@ int main(int argc, char **argv) pthread_t *tid_dispatcher, *tid_worker; void *tret; unsigned long long *count_dispatcher, *count_worker; - unsigned long long tot_enqueues = 0, tot_dequeues = 0; - unsigned long long end_dequeues = 0; + unsigned long long tot_incoming = 0, tot_work_done = 0, tot_discard = 0; int i, a, retval = 0; int worker_flags = 0; @@ -349,7 +375,8 @@ int main(int argc, char **argv) tid_dispatcher = calloc(nr_dispatchers, sizeof(*tid_dispatcher)); tid_worker = calloc(nr_workers, sizeof(*tid_worker)); - count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher)); + count_dispatcher = calloc(nr_dispatchers, + 2 * sizeof(*count_dispatcher)); count_worker = calloc(nr_workers, sizeof(*count_worker)); if (test_steal) worker_flags |= URCU_WORKER_STEAL; @@ -359,7 +386,7 @@ int main(int argc, char **argv) for (i = 0; i < nr_dispatchers; i++) { err = pthread_create(&tid_dispatcher[i], NULL, thr_dispatcher, - &count_dispatcher[i]); + &count_dispatcher[2 * i]); if (err != 0) exit(1); } @@ -396,22 +423,27 @@ int main(int argc, char **argv) err = pthread_join(tid_dispatcher[i], &tret); if (err != 0) exit(1); - tot_enqueues += count_dispatcher[i]; + tot_incoming += count_dispatcher[2 * i]; + tot_discard += count_dispatcher[(2 * i) + 1]; } for (i = 0; i < nr_workers; i++) { err = pthread_join(tid_worker[i], &tret); if (err != 0) exit(1); - tot_dequeues += count_worker[i]; + tot_work_done += count_worker[i]; } printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu " "work_loops %lu nr_workers %3u " - "nr_enqueues %12llu nr_dequeues %12llu max_queue_len %lu " - "work_stealing %s\n", + "nr_incoming %12llu nr_work_done %12llu nr_discard %12llu " + "max_queue_len %lu work_stealing %s\n", argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops, - nr_workers, tot_enqueues, tot_dequeues, max_queue_len, - test_steal ? "enabled" : "disabled"); + nr_workers, tot_incoming, tot_work_done, tot_discard, + max_queue_len, test_steal ? "enabled" : "disabled"); + if (nr_incoming != nr_work_done + nr_discard) { + printf("ERROR: nr_incoming does not match sum of work done and discard.\n"); + retval = -1; + } free(count_dispatcher); free(count_worker); free(tid_dispatcher); diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h index 458d976..ff0c650 100644 --- a/urcu/workqueue-fifo.h +++ b/urcu/workqueue-fifo.h @@ -502,4 +502,29 @@ void urcu_workqueue_shutdown(struct urcu_workqueue *queue) __urcu_workqueue_wakeup_all(queue); } +/* + * Use to let dispatcher steal work from the entire queue in case of + * stall. The "worker" parameter need to be intialized, but is usually + * not registered. + */ +static inline +bool urcu_workqueue_steal_all(struct urcu_workqueue *queue, + struct urcu_worker *worker) +{ + struct urcu_worker *sibling; + bool has_work = false; + + rcu_read_lock(); + /* Steal from each worker */ + cds_list_for_each_entry_rcu(sibling, &queue->sibling_head, + sibling_node) + has_work |= ___urcu_grab_work(worker, &sibling->head, + &sibling->tail, 1); + rcu_read_unlock(); + + /* Steal from global workqueue */ + has_work |= ___urcu_grab_work(worker, &queue->head, &queue->tail, 0); + return has_work; +} + #endif /* _URCU_WORKQUEUE_FIFO_H */