#include <urcu/wfstack.h>
#include <urcu/workqueue-fifo.h>
-static volatile int test_go, test_stop_enqueue, test_stop_dequeue;
+static volatile int test_go, test_stop_enqueue;
static unsigned long work_loops;
static unsigned long dispatch_delay_loops;
+static unsigned long max_queue_len;
+
+static int test_steal;
+
static inline void loop_sleep(unsigned long loops)
{
while (loops-- != 0)
/*
* returns 0 if test should end.
*/
-static int test_duration_dequeue(void)
-{
- return !test_stop_dequeue;
-}
-
static int test_duration_enqueue(void)
{
return !test_stop_enqueue;
}
-static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
-static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
+static DEFINE_URCU_TLS(unsigned long long, nr_work_done);
+static DEFINE_URCU_TLS(unsigned long long, nr_incoming);
+static DEFINE_URCU_TLS(unsigned long long, nr_discard);
static unsigned int nr_dispatchers;
static unsigned int nr_workers;
struct urcu_work w;
};
+static
+void discard_queue(struct urcu_workqueue *queue)
+{
+ struct urcu_worker dummy_worker;
+
+ urcu_worker_init(queue, &dummy_worker);
+ if (!urcu_workqueue_steal_all(queue, &dummy_worker))
+ return;
+ for (;;) {
+ struct urcu_work *work;
+ struct test_work *t;
+
+ work = urcu_dequeue_work(&dummy_worker);
+ if (!work)
+ break;
+ t = caa_container_of(work, struct test_work, w);
+ printf_verbose("discard work %p\n", t);
+ URCU_TLS(nr_discard)++;
+ free(t);
+ }
+}
+
static void *thr_dispatcher(void *_count)
{
unsigned long long *count = _count;
for (;;) {
struct test_work *work = malloc(sizeof(*work));
+ enum urcu_enqueue_ret ret;
+
if (!work)
goto fail;
- printf_verbose("queue work %p\n", work);
- urcu_queue_work(&workqueue, &work->w);
- URCU_TLS(nr_enqueues)++;
+ printf_verbose("incoming work %p\n", work);
+ URCU_TLS(nr_incoming)++;
+ ret = urcu_queue_work(&workqueue, &work->w);
+ if (ret == URCU_ENQUEUE_FULL) {
+ printf_verbose("queue work %p (queue full)\n", work);
+ printf_verbose("discard work %p\n", work);
+ URCU_TLS(nr_discard)++;
+ free(work);
+ discard_queue(&workqueue);
+ continue;
+ }
+ printf_verbose("queue work %p (ok)\n", work);
if (caa_unlikely(dispatch_delay_loops))
loop_sleep(dispatch_delay_loops);
}
uatomic_inc(&test_enqueue_stopped);
- count[0] = URCU_TLS(nr_enqueues);
+ count[0] = URCU_TLS(nr_incoming);
+ count[1] = URCU_TLS(nr_discard);
printf_verbose("dispatcher thread_end, tid %lu, "
- "enqueues %llu\n",
+ "incoming %llu discard %llu\n",
urcu_get_thread_id(),
- URCU_TLS(nr_enqueues));
+ URCU_TLS(nr_incoming),
+ URCU_TLS(nr_discard));
return ((void*)1);
}
unsigned long long *count = _count;
unsigned int counter = 0;
struct urcu_worker worker;
- int blocking = 1;
printf_verbose("thread_begin %s, tid %lu\n",
"worker", urcu_get_thread_id());
set_affinity();
rcu_register_thread();
- urcu_worker_init(&worker, URCU_WORKER_STEAL);
- //urcu_worker_init(&worker, 0);
+ urcu_worker_init(&workqueue, &worker);
urcu_worker_register(&workqueue, &worker);
while (!test_go)
cmm_smp_mb();
for (;;) {
- int batch_work_count = 0;
+ enum urcu_accept_ret ret;
- urcu_accept_work(&workqueue, &worker, blocking);
+ ret = urcu_accept_work(&worker);
+ if (ret == URCU_ACCEPT_SHUTDOWN)
+ break;
for (;;) {
struct urcu_work *work;
struct test_work *t;
break;
t = caa_container_of(work, struct test_work, w);
printf_verbose("dequeue work %p\n", t);
- batch_work_count++;
- URCU_TLS(nr_dequeues)++;
+ URCU_TLS(nr_work_done)++;
if (caa_unlikely(work_loops))
loop_sleep(work_loops);
free(t);
}
- if (!test_duration_dequeue())
- blocking = 0;
- if (caa_unlikely(!test_duration_dequeue()
- && !batch_work_count))
- break;
}
end:
urcu_worker_unregister(&workqueue, &worker);
printf_verbose("worker thread_end, tid %lu, "
"dequeues %llu\n",
urcu_get_thread_id(),
- URCU_TLS(nr_dequeues));
- count[0] = URCU_TLS(nr_dequeues);
+ URCU_TLS(nr_work_done));
+ count[0] = URCU_TLS(nr_work_done);
return ((void*)2);
}
printf(" [-v] (verbose output)\n");
printf(" [-a cpu#] [-a cpu#]... (affinity)\n");
printf(" [-w] Wait for worker to empty stack\n");
+ printf(" [-m len] (Max queue length. 0 means infinite.))\n");
+ printf(" [-s] (Enable work-stealing between workers.))\n");
printf("\n");
}
pthread_t *tid_dispatcher, *tid_worker;
void *tret;
unsigned long long *count_dispatcher, *count_worker;
- unsigned long long tot_enqueues = 0, tot_dequeues = 0;
- unsigned long long end_dequeues = 0;
+ unsigned long long tot_incoming = 0, tot_work_done = 0, tot_discard = 0;
int i, a, retval = 0;
+ int worker_flags = 0;
if (argc < 4) {
show_usage(argc, argv);
use_affinity = 1;
printf_verbose("Adding CPU %d affinity\n", a);
break;
+ case 'm':
+ if (argc < i + 2) {
+ show_usage(argc, argv);
+ return -1;
+ }
+ max_queue_len = atol(argv[++i]);
+ break;
case 'c':
if (argc < i + 2) {
show_usage(argc, argv);
case 'w':
test_wait_empty = 1;
break;
+ case 's':
+ test_steal = 1;
+ break;
}
}
tid_dispatcher = calloc(nr_dispatchers, sizeof(*tid_dispatcher));
tid_worker = calloc(nr_workers, sizeof(*tid_worker));
- count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher));
+ count_dispatcher = calloc(nr_dispatchers,
+ 2 * sizeof(*count_dispatcher));
count_worker = calloc(nr_workers, sizeof(*count_worker));
- urcu_workqueue_init(&workqueue);
+ if (test_steal)
+ worker_flags |= URCU_WORKER_STEAL;
+ urcu_workqueue_init(&workqueue, max_queue_len, worker_flags);
next_aff = 0;
for (i = 0; i < nr_dispatchers; i++) {
err = pthread_create(&tid_dispatcher[i], NULL, thr_dispatcher,
- &count_dispatcher[i]);
+ &count_dispatcher[2 * i]);
if (err != 0)
exit(1);
}
sleep(1);
}
}
- test_stop_dequeue = 1;
-
- /* Send finish to all workers */
- urcu_workqueue_wakeup_all(&workqueue);
+ urcu_workqueue_shutdown(&workqueue);
for (i = 0; i < nr_dispatchers; i++) {
err = pthread_join(tid_dispatcher[i], &tret);
if (err != 0)
exit(1);
- tot_enqueues += count_dispatcher[i];
+ tot_incoming += count_dispatcher[2 * i];
+ tot_discard += count_dispatcher[(2 * i) + 1];
}
for (i = 0; i < nr_workers; i++) {
err = pthread_join(tid_worker[i], &tret);
if (err != 0)
exit(1);
- tot_dequeues += count_worker[i];
+ tot_work_done += count_worker[i];
}
printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
"work_loops %lu nr_workers %3u "
- "nr_enqueues %12llu nr_dequeues %12llu\n",
+ "nr_incoming %12llu nr_work_done %12llu nr_discard %12llu "
+ "max_queue_len %lu work_stealing %s\n",
argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
- nr_workers, tot_enqueues, tot_dequeues);
+ nr_workers, tot_incoming, tot_work_done, tot_discard,
+ max_queue_len, test_steal ? "enabled" : "disabled");
+ if (nr_incoming != nr_work_done + nr_discard) {
+ printf("ERROR: nr_incoming does not match sum of work done and discard.\n");
+ retval = -1;
+ }
free(count_dispatcher);
free(count_worker);
free(tid_dispatcher);