workqueue: handle congestion by clearing queue
[userspace-rcu.git] / tests / benchmark / test_urcu_workqueue.c
CommitLineData
1abec5a0
MD
1/*
2 * test_urcu_workqueue.c
3 *
4 * Userspace RCU library - workqueue test
5 *
6 * Copyright February 2010-2014 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 * Copyright February 2010 - Paolo Bonzini <pbonzini@redhat.com>
8 *
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write to the Free Software Foundation, Inc.,
21 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 */
23
24#define _GNU_SOURCE
25#include "config.h"
26#include <stdio.h>
27#include <pthread.h>
28#include <stdlib.h>
29#include <stdint.h>
30#include <stdbool.h>
31#include <string.h>
32#include <sys/types.h>
33#include <sys/wait.h>
34#include <unistd.h>
35#include <stdio.h>
36#include <assert.h>
37#include <errno.h>
38
39#include <urcu/arch.h>
40#include <urcu/tls-compat.h>
41#include <urcu/uatomic.h>
42#include "cpuset.h"
43#include "thread-id.h"
44
45/* hardcoded number of CPUs */
46#define NR_CPUS 16384
47
48#ifndef DYNAMIC_LINK_TEST
49#define _LGPL_SOURCE
50#endif
51#include <urcu.h>
52#include <urcu/wfstack.h>
53#include <urcu/workqueue-fifo.h>
54
1b0a9891 55static volatile int test_go, test_stop_enqueue;
1abec5a0
MD
56
57static unsigned long work_loops;
58
59static unsigned long duration;
60
61static unsigned long dispatch_delay_loops;
62
8a2c74fe
MD
63static unsigned long max_queue_len;
64
2f02be5c
MD
65static int test_steal;
66
1abec5a0
MD
67static inline void loop_sleep(unsigned long loops)
68{
69 while (loops-- != 0)
70 caa_cpu_relax();
71}
72
73static int verbose_mode;
74
75static int test_wait_empty;
76static int test_enqueue_stopped;
77
78#define printf_verbose(fmt, args...) \
79 do { \
80 if (verbose_mode) \
81 fprintf(stderr, fmt, ## args); \
82 } while (0)
83
84static unsigned int cpu_affinities[NR_CPUS];
85static unsigned int next_aff = 0;
86static int use_affinity = 0;
87
88pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER;
89
90static void set_affinity(void)
91{
92#if HAVE_SCHED_SETAFFINITY
93 cpu_set_t mask;
94 int cpu, ret;
95#endif /* HAVE_SCHED_SETAFFINITY */
96
97 if (!use_affinity)
98 return;
99
100#if HAVE_SCHED_SETAFFINITY
101 ret = pthread_mutex_lock(&affinity_mutex);
102 if (ret) {
103 perror("Error in pthread mutex lock");
104 exit(-1);
105 }
106 cpu = cpu_affinities[next_aff++];
107 ret = pthread_mutex_unlock(&affinity_mutex);
108 if (ret) {
109 perror("Error in pthread mutex unlock");
110 exit(-1);
111 }
112
113 CPU_ZERO(&mask);
114 CPU_SET(cpu, &mask);
115#if SCHED_SETAFFINITY_ARGS == 2
116 sched_setaffinity(0, &mask);
117#else
118 sched_setaffinity(0, sizeof(mask), &mask);
119#endif
120#endif /* HAVE_SCHED_SETAFFINITY */
121}
122
123/*
124 * returns 0 if test should end.
125 */
1abec5a0
MD
126static int test_duration_enqueue(void)
127{
128 return !test_stop_enqueue;
129}
130
f00ae188
MD
131static DEFINE_URCU_TLS(unsigned long long, nr_work_done);
132static DEFINE_URCU_TLS(unsigned long long, nr_incoming);
133static DEFINE_URCU_TLS(unsigned long long, nr_discard);
1abec5a0
MD
134
135static unsigned int nr_dispatchers;
136static unsigned int nr_workers;
137
138static struct urcu_workqueue workqueue;
139
140struct test_work {
141 struct urcu_work w;
142};
143
f00ae188
MD
144static
145void discard_queue(struct urcu_workqueue *queue)
146{
147 struct urcu_worker dummy_worker;
148
149 urcu_worker_init(queue, &dummy_worker);
150 if (!urcu_workqueue_steal_all(queue, &dummy_worker))
151 return;
152 for (;;) {
153 struct urcu_work *work;
154 struct test_work *t;
155
156 work = urcu_dequeue_work(&dummy_worker);
157 if (!work)
158 break;
159 t = caa_container_of(work, struct test_work, w);
160 printf_verbose("discard work %p\n", t);
161 URCU_TLS(nr_discard)++;
162 free(t);
163 }
164}
165
1abec5a0
MD
166static void *thr_dispatcher(void *_count)
167{
168 unsigned long long *count = _count;
169 bool was_nonempty;
170
171 printf_verbose("thread_begin %s, tid %lu\n",
172 "dispatcher", urcu_get_thread_id());
173
174 set_affinity();
175
176 while (!test_go)
177 {
178 }
179 cmm_smp_mb();
180
181 for (;;) {
182 struct test_work *work = malloc(sizeof(*work));
8a2c74fe
MD
183 enum urcu_enqueue_ret ret;
184
1abec5a0
MD
185 if (!work)
186 goto fail;
f00ae188
MD
187 printf_verbose("incoming work %p\n", work);
188 URCU_TLS(nr_incoming)++;
8a2c74fe
MD
189 ret = urcu_queue_work(&workqueue, &work->w);
190 if (ret == URCU_ENQUEUE_FULL) {
191 printf_verbose("queue work %p (queue full)\n", work);
f00ae188
MD
192 printf_verbose("discard work %p\n", work);
193 URCU_TLS(nr_discard)++;
194 free(work);
195 discard_queue(&workqueue);
196 continue;
8a2c74fe
MD
197 }
198 printf_verbose("queue work %p (ok)\n", work);
1abec5a0
MD
199
200 if (caa_unlikely(dispatch_delay_loops))
201 loop_sleep(dispatch_delay_loops);
202fail:
203 if (caa_unlikely(!test_duration_enqueue()))
204 break;
205 }
206
207 uatomic_inc(&test_enqueue_stopped);
f00ae188
MD
208 count[0] = URCU_TLS(nr_incoming);
209 count[1] = URCU_TLS(nr_discard);
1abec5a0 210 printf_verbose("dispatcher thread_end, tid %lu, "
f00ae188 211 "incoming %llu discard %llu\n",
1abec5a0 212 urcu_get_thread_id(),
f00ae188
MD
213 URCU_TLS(nr_incoming),
214 URCU_TLS(nr_discard));
1abec5a0
MD
215 return ((void*)1);
216}
217
218static void *thr_worker(void *_count)
219{
220 unsigned long long *count = _count;
221 unsigned int counter = 0;
222 struct urcu_worker worker;
1abec5a0
MD
223
224 printf_verbose("thread_begin %s, tid %lu\n",
225 "worker", urcu_get_thread_id());
226
227 set_affinity();
228
229 rcu_register_thread();
2f02be5c 230 urcu_worker_init(&workqueue, &worker);
1abec5a0
MD
231 urcu_worker_register(&workqueue, &worker);
232
233 while (!test_go)
234 {
235 }
236 cmm_smp_mb();
237
238 for (;;) {
1b0a9891 239 enum urcu_accept_ret ret;
1abec5a0 240
8a2c74fe 241 ret = urcu_accept_work(&worker);
1b0a9891
MD
242 if (ret == URCU_ACCEPT_SHUTDOWN)
243 break;
1abec5a0
MD
244 for (;;) {
245 struct urcu_work *work;
246 struct test_work *t;
247
248 work = urcu_dequeue_work(&worker);
249 if (!work)
250 break;
251 t = caa_container_of(work, struct test_work, w);
252 printf_verbose("dequeue work %p\n", t);
f00ae188 253 URCU_TLS(nr_work_done)++;
1abec5a0
MD
254 if (caa_unlikely(work_loops))
255 loop_sleep(work_loops);
256 free(t);
257 }
1abec5a0
MD
258 }
259end:
260 urcu_worker_unregister(&workqueue, &worker);
261 rcu_unregister_thread();
262
263 printf_verbose("worker thread_end, tid %lu, "
264 "dequeues %llu\n",
265 urcu_get_thread_id(),
f00ae188
MD
266 URCU_TLS(nr_work_done));
267 count[0] = URCU_TLS(nr_work_done);
1abec5a0
MD
268 return ((void*)2);
269}
270
271static void show_usage(int argc, char **argv)
272{
273 printf("Usage : %s nr_workers nr_dispatchers duration (s) <OPTIONS>\n",
274 argv[0]);
275 printf("OPTIONS:\n");
276 printf(" [-d delay] (dispatcher period (in loops))\n");
277 printf(" [-c duration] (worker period (in loops))\n");
278 printf(" [-v] (verbose output)\n");
279 printf(" [-a cpu#] [-a cpu#]... (affinity)\n");
280 printf(" [-w] Wait for worker to empty stack\n");
8a2c74fe 281 printf(" [-m len] (Max queue length. 0 means infinite.))\n");
2f02be5c 282 printf(" [-s] (Enable work-stealing between workers.))\n");
1abec5a0
MD
283 printf("\n");
284}
285
286int main(int argc, char **argv)
287{
288 int err;
289 pthread_t *tid_dispatcher, *tid_worker;
290 void *tret;
291 unsigned long long *count_dispatcher, *count_worker;
f00ae188 292 unsigned long long tot_incoming = 0, tot_work_done = 0, tot_discard = 0;
1abec5a0 293 int i, a, retval = 0;
2f02be5c 294 int worker_flags = 0;
1abec5a0
MD
295
296 if (argc < 4) {
297 show_usage(argc, argv);
298 return -1;
299 }
300
301 err = sscanf(argv[1], "%u", &nr_workers);
302 if (err != 1) {
303 show_usage(argc, argv);
304 return -1;
305 }
306
307 err = sscanf(argv[2], "%u", &nr_dispatchers);
308 if (err != 1) {
309 show_usage(argc, argv);
310 return -1;
311 }
312
313 err = sscanf(argv[3], "%lu", &duration);
314 if (err != 1) {
315 show_usage(argc, argv);
316 return -1;
317 }
318
319 for (i = 4; i < argc; i++) {
320 if (argv[i][0] != '-')
321 continue;
322 switch (argv[i][1]) {
323 case 'a':
324 if (argc < i + 2) {
325 show_usage(argc, argv);
326 return -1;
327 }
328 a = atoi(argv[++i]);
329 cpu_affinities[next_aff++] = a;
330 use_affinity = 1;
331 printf_verbose("Adding CPU %d affinity\n", a);
332 break;
8a2c74fe
MD
333 case 'm':
334 if (argc < i + 2) {
335 show_usage(argc, argv);
336 return -1;
337 }
338 max_queue_len = atol(argv[++i]);
339 break;
1abec5a0
MD
340 case 'c':
341 if (argc < i + 2) {
342 show_usage(argc, argv);
343 return -1;
344 }
345 work_loops = atol(argv[++i]);
346 break;
347 case 'd':
348 if (argc < i + 2) {
349 show_usage(argc, argv);
350 return -1;
351 }
352 dispatch_delay_loops = atol(argv[++i]);
353 break;
354 case 'v':
355 verbose_mode = 1;
356 break;
357 case 'w':
358 test_wait_empty = 1;
359 break;
2f02be5c
MD
360 case 's':
361 test_steal = 1;
362 break;
1abec5a0
MD
363 }
364 }
365
366 printf_verbose("running test for %lu seconds, %u dispatchers, "
367 "%u workers.\n",
368 duration, nr_dispatchers, nr_workers);
369 if (test_wait_empty)
370 printf_verbose("Wait for workers to empty workqueue.\n");
371 printf_verbose("Work duration: %lu loops.\n", work_loops);
372 printf_verbose("Dispatcher arrival delay: %lu loops.\n", dispatch_delay_loops);
373 printf_verbose("thread %-6s, tid %lu\n",
374 "main", urcu_get_thread_id());
375
376 tid_dispatcher = calloc(nr_dispatchers, sizeof(*tid_dispatcher));
377 tid_worker = calloc(nr_workers, sizeof(*tid_worker));
f00ae188
MD
378 count_dispatcher = calloc(nr_dispatchers,
379 2 * sizeof(*count_dispatcher));
1abec5a0 380 count_worker = calloc(nr_workers, sizeof(*count_worker));
2f02be5c
MD
381 if (test_steal)
382 worker_flags |= URCU_WORKER_STEAL;
383 urcu_workqueue_init(&workqueue, max_queue_len, worker_flags);
1abec5a0
MD
384
385 next_aff = 0;
386
387 for (i = 0; i < nr_dispatchers; i++) {
388 err = pthread_create(&tid_dispatcher[i], NULL, thr_dispatcher,
f00ae188 389 &count_dispatcher[2 * i]);
1abec5a0
MD
390 if (err != 0)
391 exit(1);
392 }
393 for (i = 0; i < nr_workers; i++) {
394 err = pthread_create(&tid_worker[i], NULL, thr_worker,
395 &count_worker[i]);
396 if (err != 0)
397 exit(1);
398 }
399
400 cmm_smp_mb();
401
402 test_go = 1;
403
404 for (i = 0; i < duration; i++) {
405 sleep(1);
406 if (verbose_mode)
407 (void) write(1, ".", 1);
408 }
409
410 test_stop_enqueue = 1;
411 while (nr_dispatchers != uatomic_read(&test_enqueue_stopped)) {
412 sleep(1);
413 }
414
415 if (test_wait_empty) {
416 while (!cds_wfcq_empty(&workqueue.head, &workqueue.tail)) {
417 sleep(1);
418 }
419 }
1b0a9891 420 urcu_workqueue_shutdown(&workqueue);
1abec5a0
MD
421
422 for (i = 0; i < nr_dispatchers; i++) {
423 err = pthread_join(tid_dispatcher[i], &tret);
424 if (err != 0)
425 exit(1);
f00ae188
MD
426 tot_incoming += count_dispatcher[2 * i];
427 tot_discard += count_dispatcher[(2 * i) + 1];
1abec5a0
MD
428 }
429 for (i = 0; i < nr_workers; i++) {
430 err = pthread_join(tid_worker[i], &tret);
431 if (err != 0)
432 exit(1);
f00ae188 433 tot_work_done += count_worker[i];
1abec5a0
MD
434 }
435
436 printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
437 "work_loops %lu nr_workers %3u "
f00ae188
MD
438 "nr_incoming %12llu nr_work_done %12llu nr_discard %12llu "
439 "max_queue_len %lu work_stealing %s\n",
1abec5a0 440 argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
f00ae188
MD
441 nr_workers, tot_incoming, tot_work_done, tot_discard,
442 max_queue_len, test_steal ? "enabled" : "disabled");
443 if (nr_incoming != nr_work_done + nr_discard) {
444 printf("ERROR: nr_incoming does not match sum of work done and discard.\n");
445 retval = -1;
446 }
1abec5a0
MD
447 free(count_dispatcher);
448 free(count_worker);
449 free(tid_dispatcher);
450 free(tid_worker);
451 return retval;
452}
This page took 0.041011 seconds and 4 git commands to generate.