workqueue: handle congestion by clearing queue
[userspace-rcu.git] / tests / benchmark / test_urcu_workqueue.c
1 /*
2 * test_urcu_workqueue.c
3 *
4 * Userspace RCU library - workqueue test
5 *
6 * Copyright February 2010-2014 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 * Copyright February 2010 - Paolo Bonzini <pbonzini@redhat.com>
8 *
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write to the Free Software Foundation, Inc.,
21 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 */
23
24 #define _GNU_SOURCE
25 #include "config.h"
26 #include <stdio.h>
27 #include <pthread.h>
28 #include <stdlib.h>
29 #include <stdint.h>
30 #include <stdbool.h>
31 #include <string.h>
32 #include <sys/types.h>
33 #include <sys/wait.h>
34 #include <unistd.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <errno.h>
38
39 #include <urcu/arch.h>
40 #include <urcu/tls-compat.h>
41 #include <urcu/uatomic.h>
42 #include "cpuset.h"
43 #include "thread-id.h"
44
45 /* hardcoded number of CPUs */
46 #define NR_CPUS 16384
47
48 #ifndef DYNAMIC_LINK_TEST
49 #define _LGPL_SOURCE
50 #endif
51 #include <urcu.h>
52 #include <urcu/wfstack.h>
53 #include <urcu/workqueue-fifo.h>
54
55 static volatile int test_go, test_stop_enqueue;
56
57 static unsigned long work_loops;
58
59 static unsigned long duration;
60
61 static unsigned long dispatch_delay_loops;
62
63 static unsigned long max_queue_len;
64
65 static int test_steal;
66
67 static inline void loop_sleep(unsigned long loops)
68 {
69 while (loops-- != 0)
70 caa_cpu_relax();
71 }
72
73 static int verbose_mode;
74
75 static int test_wait_empty;
76 static int test_enqueue_stopped;
77
78 #define printf_verbose(fmt, args...) \
79 do { \
80 if (verbose_mode) \
81 fprintf(stderr, fmt, ## args); \
82 } while (0)
83
84 static unsigned int cpu_affinities[NR_CPUS];
85 static unsigned int next_aff = 0;
86 static int use_affinity = 0;
87
88 pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER;
89
90 static void set_affinity(void)
91 {
92 #if HAVE_SCHED_SETAFFINITY
93 cpu_set_t mask;
94 int cpu, ret;
95 #endif /* HAVE_SCHED_SETAFFINITY */
96
97 if (!use_affinity)
98 return;
99
100 #if HAVE_SCHED_SETAFFINITY
101 ret = pthread_mutex_lock(&affinity_mutex);
102 if (ret) {
103 perror("Error in pthread mutex lock");
104 exit(-1);
105 }
106 cpu = cpu_affinities[next_aff++];
107 ret = pthread_mutex_unlock(&affinity_mutex);
108 if (ret) {
109 perror("Error in pthread mutex unlock");
110 exit(-1);
111 }
112
113 CPU_ZERO(&mask);
114 CPU_SET(cpu, &mask);
115 #if SCHED_SETAFFINITY_ARGS == 2
116 sched_setaffinity(0, &mask);
117 #else
118 sched_setaffinity(0, sizeof(mask), &mask);
119 #endif
120 #endif /* HAVE_SCHED_SETAFFINITY */
121 }
122
123 /*
124 * returns 0 if test should end.
125 */
126 static int test_duration_enqueue(void)
127 {
128 return !test_stop_enqueue;
129 }
130
131 static DEFINE_URCU_TLS(unsigned long long, nr_work_done);
132 static DEFINE_URCU_TLS(unsigned long long, nr_incoming);
133 static DEFINE_URCU_TLS(unsigned long long, nr_discard);
134
135 static unsigned int nr_dispatchers;
136 static unsigned int nr_workers;
137
138 static struct urcu_workqueue workqueue;
139
140 struct test_work {
141 struct urcu_work w;
142 };
143
144 static
145 void discard_queue(struct urcu_workqueue *queue)
146 {
147 struct urcu_worker dummy_worker;
148
149 urcu_worker_init(queue, &dummy_worker);
150 if (!urcu_workqueue_steal_all(queue, &dummy_worker))
151 return;
152 for (;;) {
153 struct urcu_work *work;
154 struct test_work *t;
155
156 work = urcu_dequeue_work(&dummy_worker);
157 if (!work)
158 break;
159 t = caa_container_of(work, struct test_work, w);
160 printf_verbose("discard work %p\n", t);
161 URCU_TLS(nr_discard)++;
162 free(t);
163 }
164 }
165
166 static void *thr_dispatcher(void *_count)
167 {
168 unsigned long long *count = _count;
169 bool was_nonempty;
170
171 printf_verbose("thread_begin %s, tid %lu\n",
172 "dispatcher", urcu_get_thread_id());
173
174 set_affinity();
175
176 while (!test_go)
177 {
178 }
179 cmm_smp_mb();
180
181 for (;;) {
182 struct test_work *work = malloc(sizeof(*work));
183 enum urcu_enqueue_ret ret;
184
185 if (!work)
186 goto fail;
187 printf_verbose("incoming work %p\n", work);
188 URCU_TLS(nr_incoming)++;
189 ret = urcu_queue_work(&workqueue, &work->w);
190 if (ret == URCU_ENQUEUE_FULL) {
191 printf_verbose("queue work %p (queue full)\n", work);
192 printf_verbose("discard work %p\n", work);
193 URCU_TLS(nr_discard)++;
194 free(work);
195 discard_queue(&workqueue);
196 continue;
197 }
198 printf_verbose("queue work %p (ok)\n", work);
199
200 if (caa_unlikely(dispatch_delay_loops))
201 loop_sleep(dispatch_delay_loops);
202 fail:
203 if (caa_unlikely(!test_duration_enqueue()))
204 break;
205 }
206
207 uatomic_inc(&test_enqueue_stopped);
208 count[0] = URCU_TLS(nr_incoming);
209 count[1] = URCU_TLS(nr_discard);
210 printf_verbose("dispatcher thread_end, tid %lu, "
211 "incoming %llu discard %llu\n",
212 urcu_get_thread_id(),
213 URCU_TLS(nr_incoming),
214 URCU_TLS(nr_discard));
215 return ((void*)1);
216 }
217
218 static void *thr_worker(void *_count)
219 {
220 unsigned long long *count = _count;
221 unsigned int counter = 0;
222 struct urcu_worker worker;
223
224 printf_verbose("thread_begin %s, tid %lu\n",
225 "worker", urcu_get_thread_id());
226
227 set_affinity();
228
229 rcu_register_thread();
230 urcu_worker_init(&workqueue, &worker);
231 urcu_worker_register(&workqueue, &worker);
232
233 while (!test_go)
234 {
235 }
236 cmm_smp_mb();
237
238 for (;;) {
239 enum urcu_accept_ret ret;
240
241 ret = urcu_accept_work(&worker);
242 if (ret == URCU_ACCEPT_SHUTDOWN)
243 break;
244 for (;;) {
245 struct urcu_work *work;
246 struct test_work *t;
247
248 work = urcu_dequeue_work(&worker);
249 if (!work)
250 break;
251 t = caa_container_of(work, struct test_work, w);
252 printf_verbose("dequeue work %p\n", t);
253 URCU_TLS(nr_work_done)++;
254 if (caa_unlikely(work_loops))
255 loop_sleep(work_loops);
256 free(t);
257 }
258 }
259 end:
260 urcu_worker_unregister(&workqueue, &worker);
261 rcu_unregister_thread();
262
263 printf_verbose("worker thread_end, tid %lu, "
264 "dequeues %llu\n",
265 urcu_get_thread_id(),
266 URCU_TLS(nr_work_done));
267 count[0] = URCU_TLS(nr_work_done);
268 return ((void*)2);
269 }
270
271 static void show_usage(int argc, char **argv)
272 {
273 printf("Usage : %s nr_workers nr_dispatchers duration (s) <OPTIONS>\n",
274 argv[0]);
275 printf("OPTIONS:\n");
276 printf(" [-d delay] (dispatcher period (in loops))\n");
277 printf(" [-c duration] (worker period (in loops))\n");
278 printf(" [-v] (verbose output)\n");
279 printf(" [-a cpu#] [-a cpu#]... (affinity)\n");
280 printf(" [-w] Wait for worker to empty stack\n");
281 printf(" [-m len] (Max queue length. 0 means infinite.))\n");
282 printf(" [-s] (Enable work-stealing between workers.))\n");
283 printf("\n");
284 }
285
286 int main(int argc, char **argv)
287 {
288 int err;
289 pthread_t *tid_dispatcher, *tid_worker;
290 void *tret;
291 unsigned long long *count_dispatcher, *count_worker;
292 unsigned long long tot_incoming = 0, tot_work_done = 0, tot_discard = 0;
293 int i, a, retval = 0;
294 int worker_flags = 0;
295
296 if (argc < 4) {
297 show_usage(argc, argv);
298 return -1;
299 }
300
301 err = sscanf(argv[1], "%u", &nr_workers);
302 if (err != 1) {
303 show_usage(argc, argv);
304 return -1;
305 }
306
307 err = sscanf(argv[2], "%u", &nr_dispatchers);
308 if (err != 1) {
309 show_usage(argc, argv);
310 return -1;
311 }
312
313 err = sscanf(argv[3], "%lu", &duration);
314 if (err != 1) {
315 show_usage(argc, argv);
316 return -1;
317 }
318
319 for (i = 4; i < argc; i++) {
320 if (argv[i][0] != '-')
321 continue;
322 switch (argv[i][1]) {
323 case 'a':
324 if (argc < i + 2) {
325 show_usage(argc, argv);
326 return -1;
327 }
328 a = atoi(argv[++i]);
329 cpu_affinities[next_aff++] = a;
330 use_affinity = 1;
331 printf_verbose("Adding CPU %d affinity\n", a);
332 break;
333 case 'm':
334 if (argc < i + 2) {
335 show_usage(argc, argv);
336 return -1;
337 }
338 max_queue_len = atol(argv[++i]);
339 break;
340 case 'c':
341 if (argc < i + 2) {
342 show_usage(argc, argv);
343 return -1;
344 }
345 work_loops = atol(argv[++i]);
346 break;
347 case 'd':
348 if (argc < i + 2) {
349 show_usage(argc, argv);
350 return -1;
351 }
352 dispatch_delay_loops = atol(argv[++i]);
353 break;
354 case 'v':
355 verbose_mode = 1;
356 break;
357 case 'w':
358 test_wait_empty = 1;
359 break;
360 case 's':
361 test_steal = 1;
362 break;
363 }
364 }
365
366 printf_verbose("running test for %lu seconds, %u dispatchers, "
367 "%u workers.\n",
368 duration, nr_dispatchers, nr_workers);
369 if (test_wait_empty)
370 printf_verbose("Wait for workers to empty workqueue.\n");
371 printf_verbose("Work duration: %lu loops.\n", work_loops);
372 printf_verbose("Dispatcher arrival delay: %lu loops.\n", dispatch_delay_loops);
373 printf_verbose("thread %-6s, tid %lu\n",
374 "main", urcu_get_thread_id());
375
376 tid_dispatcher = calloc(nr_dispatchers, sizeof(*tid_dispatcher));
377 tid_worker = calloc(nr_workers, sizeof(*tid_worker));
378 count_dispatcher = calloc(nr_dispatchers,
379 2 * sizeof(*count_dispatcher));
380 count_worker = calloc(nr_workers, sizeof(*count_worker));
381 if (test_steal)
382 worker_flags |= URCU_WORKER_STEAL;
383 urcu_workqueue_init(&workqueue, max_queue_len, worker_flags);
384
385 next_aff = 0;
386
387 for (i = 0; i < nr_dispatchers; i++) {
388 err = pthread_create(&tid_dispatcher[i], NULL, thr_dispatcher,
389 &count_dispatcher[2 * i]);
390 if (err != 0)
391 exit(1);
392 }
393 for (i = 0; i < nr_workers; i++) {
394 err = pthread_create(&tid_worker[i], NULL, thr_worker,
395 &count_worker[i]);
396 if (err != 0)
397 exit(1);
398 }
399
400 cmm_smp_mb();
401
402 test_go = 1;
403
404 for (i = 0; i < duration; i++) {
405 sleep(1);
406 if (verbose_mode)
407 (void) write(1, ".", 1);
408 }
409
410 test_stop_enqueue = 1;
411 while (nr_dispatchers != uatomic_read(&test_enqueue_stopped)) {
412 sleep(1);
413 }
414
415 if (test_wait_empty) {
416 while (!cds_wfcq_empty(&workqueue.head, &workqueue.tail)) {
417 sleep(1);
418 }
419 }
420 urcu_workqueue_shutdown(&workqueue);
421
422 for (i = 0; i < nr_dispatchers; i++) {
423 err = pthread_join(tid_dispatcher[i], &tret);
424 if (err != 0)
425 exit(1);
426 tot_incoming += count_dispatcher[2 * i];
427 tot_discard += count_dispatcher[(2 * i) + 1];
428 }
429 for (i = 0; i < nr_workers; i++) {
430 err = pthread_join(tid_worker[i], &tret);
431 if (err != 0)
432 exit(1);
433 tot_work_done += count_worker[i];
434 }
435
436 printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
437 "work_loops %lu nr_workers %3u "
438 "nr_incoming %12llu nr_work_done %12llu nr_discard %12llu "
439 "max_queue_len %lu work_stealing %s\n",
440 argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
441 nr_workers, tot_incoming, tot_work_done, tot_discard,
442 max_queue_len, test_steal ? "enabled" : "disabled");
443 if (nr_incoming != nr_work_done + nr_discard) {
444 printf("ERROR: nr_incoming does not match sum of work done and discard.\n");
445 retval = -1;
446 }
447 free(count_dispatcher);
448 free(count_worker);
449 free(tid_dispatcher);
450 free(tid_worker);
451 return retval;
452 }
This page took 0.045417 seconds and 4 git commands to generate.