workqueue: move work stealing flag to workqueue
[userspace-rcu.git] / tests / benchmark / test_urcu_workqueue.c
1 /*
2 * test_urcu_workqueue.c
3 *
4 * Userspace RCU library - workqueue test
5 *
6 * Copyright February 2010-2014 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 * Copyright February 2010 - Paolo Bonzini <pbonzini@redhat.com>
8 *
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write to the Free Software Foundation, Inc.,
21 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 */
23
24 #define _GNU_SOURCE
25 #include "config.h"
26 #include <stdio.h>
27 #include <pthread.h>
28 #include <stdlib.h>
29 #include <stdint.h>
30 #include <stdbool.h>
31 #include <string.h>
32 #include <sys/types.h>
33 #include <sys/wait.h>
34 #include <unistd.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <errno.h>
38
39 #include <urcu/arch.h>
40 #include <urcu/tls-compat.h>
41 #include <urcu/uatomic.h>
42 #include "cpuset.h"
43 #include "thread-id.h"
44
45 /* hardcoded number of CPUs */
46 #define NR_CPUS 16384
47
48 #ifndef DYNAMIC_LINK_TEST
49 #define _LGPL_SOURCE
50 #endif
51 #include <urcu.h>
52 #include <urcu/wfstack.h>
53 #include <urcu/workqueue-fifo.h>
54
55 static volatile int test_go, test_stop_enqueue;
56
57 static unsigned long work_loops;
58
59 static unsigned long duration;
60
61 static unsigned long dispatch_delay_loops;
62
63 static unsigned long max_queue_len;
64
65 static int test_steal;
66
67 static inline void loop_sleep(unsigned long loops)
68 {
69 while (loops-- != 0)
70 caa_cpu_relax();
71 }
72
73 static int verbose_mode;
74
75 static int test_wait_empty;
76 static int test_enqueue_stopped;
77
78 #define printf_verbose(fmt, args...) \
79 do { \
80 if (verbose_mode) \
81 fprintf(stderr, fmt, ## args); \
82 } while (0)
83
84 static unsigned int cpu_affinities[NR_CPUS];
85 static unsigned int next_aff = 0;
86 static int use_affinity = 0;
87
88 pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER;
89
90 static void set_affinity(void)
91 {
92 #if HAVE_SCHED_SETAFFINITY
93 cpu_set_t mask;
94 int cpu, ret;
95 #endif /* HAVE_SCHED_SETAFFINITY */
96
97 if (!use_affinity)
98 return;
99
100 #if HAVE_SCHED_SETAFFINITY
101 ret = pthread_mutex_lock(&affinity_mutex);
102 if (ret) {
103 perror("Error in pthread mutex lock");
104 exit(-1);
105 }
106 cpu = cpu_affinities[next_aff++];
107 ret = pthread_mutex_unlock(&affinity_mutex);
108 if (ret) {
109 perror("Error in pthread mutex unlock");
110 exit(-1);
111 }
112
113 CPU_ZERO(&mask);
114 CPU_SET(cpu, &mask);
115 #if SCHED_SETAFFINITY_ARGS == 2
116 sched_setaffinity(0, &mask);
117 #else
118 sched_setaffinity(0, sizeof(mask), &mask);
119 #endif
120 #endif /* HAVE_SCHED_SETAFFINITY */
121 }
122
123 /*
124 * returns 0 if test should end.
125 */
126 static int test_duration_enqueue(void)
127 {
128 return !test_stop_enqueue;
129 }
130
131 static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
132 static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
133
134 static unsigned int nr_dispatchers;
135 static unsigned int nr_workers;
136
137 static struct urcu_workqueue workqueue;
138
139 struct test_work {
140 struct urcu_work w;
141 };
142
143 static void *thr_dispatcher(void *_count)
144 {
145 unsigned long long *count = _count;
146 bool was_nonempty;
147
148 printf_verbose("thread_begin %s, tid %lu\n",
149 "dispatcher", urcu_get_thread_id());
150
151 set_affinity();
152
153 while (!test_go)
154 {
155 }
156 cmm_smp_mb();
157
158 for (;;) {
159 struct test_work *work = malloc(sizeof(*work));
160 enum urcu_enqueue_ret ret;
161
162 if (!work)
163 goto fail;
164 retry:
165 printf_verbose("attempt queue work %p\n", work);
166 ret = urcu_queue_work(&workqueue, &work->w);
167 if (ret == URCU_ENQUEUE_FULL) {
168 printf_verbose("queue work %p (queue full)\n", work);
169 (void) poll(NULL, 0, 10);
170 goto retry;
171 }
172 printf_verbose("queue work %p (ok)\n", work);
173 URCU_TLS(nr_enqueues)++;
174
175 if (caa_unlikely(dispatch_delay_loops))
176 loop_sleep(dispatch_delay_loops);
177 fail:
178 if (caa_unlikely(!test_duration_enqueue()))
179 break;
180 }
181
182 uatomic_inc(&test_enqueue_stopped);
183 count[0] = URCU_TLS(nr_enqueues);
184 printf_verbose("dispatcher thread_end, tid %lu, "
185 "enqueues %llu\n",
186 urcu_get_thread_id(),
187 URCU_TLS(nr_enqueues));
188 return ((void*)1);
189 }
190
191 static void *thr_worker(void *_count)
192 {
193 unsigned long long *count = _count;
194 unsigned int counter = 0;
195 struct urcu_worker worker;
196
197 printf_verbose("thread_begin %s, tid %lu\n",
198 "worker", urcu_get_thread_id());
199
200 set_affinity();
201
202 rcu_register_thread();
203 urcu_worker_init(&workqueue, &worker);
204 urcu_worker_register(&workqueue, &worker);
205
206 while (!test_go)
207 {
208 }
209 cmm_smp_mb();
210
211 for (;;) {
212 enum urcu_accept_ret ret;
213
214 ret = urcu_accept_work(&worker);
215 if (ret == URCU_ACCEPT_SHUTDOWN)
216 break;
217 for (;;) {
218 struct urcu_work *work;
219 struct test_work *t;
220
221 work = urcu_dequeue_work(&worker);
222 if (!work)
223 break;
224 t = caa_container_of(work, struct test_work, w);
225 printf_verbose("dequeue work %p\n", t);
226 URCU_TLS(nr_dequeues)++;
227 if (caa_unlikely(work_loops))
228 loop_sleep(work_loops);
229 free(t);
230 }
231 }
232 end:
233 urcu_worker_unregister(&workqueue, &worker);
234 rcu_unregister_thread();
235
236 printf_verbose("worker thread_end, tid %lu, "
237 "dequeues %llu\n",
238 urcu_get_thread_id(),
239 URCU_TLS(nr_dequeues));
240 count[0] = URCU_TLS(nr_dequeues);
241 return ((void*)2);
242 }
243
244 static void show_usage(int argc, char **argv)
245 {
246 printf("Usage : %s nr_workers nr_dispatchers duration (s) <OPTIONS>\n",
247 argv[0]);
248 printf("OPTIONS:\n");
249 printf(" [-d delay] (dispatcher period (in loops))\n");
250 printf(" [-c duration] (worker period (in loops))\n");
251 printf(" [-v] (verbose output)\n");
252 printf(" [-a cpu#] [-a cpu#]... (affinity)\n");
253 printf(" [-w] Wait for worker to empty stack\n");
254 printf(" [-m len] (Max queue length. 0 means infinite.))\n");
255 printf(" [-s] (Enable work-stealing between workers.))\n");
256 printf("\n");
257 }
258
259 int main(int argc, char **argv)
260 {
261 int err;
262 pthread_t *tid_dispatcher, *tid_worker;
263 void *tret;
264 unsigned long long *count_dispatcher, *count_worker;
265 unsigned long long tot_enqueues = 0, tot_dequeues = 0;
266 unsigned long long end_dequeues = 0;
267 int i, a, retval = 0;
268 int worker_flags = 0;
269
270 if (argc < 4) {
271 show_usage(argc, argv);
272 return -1;
273 }
274
275 err = sscanf(argv[1], "%u", &nr_workers);
276 if (err != 1) {
277 show_usage(argc, argv);
278 return -1;
279 }
280
281 err = sscanf(argv[2], "%u", &nr_dispatchers);
282 if (err != 1) {
283 show_usage(argc, argv);
284 return -1;
285 }
286
287 err = sscanf(argv[3], "%lu", &duration);
288 if (err != 1) {
289 show_usage(argc, argv);
290 return -1;
291 }
292
293 for (i = 4; i < argc; i++) {
294 if (argv[i][0] != '-')
295 continue;
296 switch (argv[i][1]) {
297 case 'a':
298 if (argc < i + 2) {
299 show_usage(argc, argv);
300 return -1;
301 }
302 a = atoi(argv[++i]);
303 cpu_affinities[next_aff++] = a;
304 use_affinity = 1;
305 printf_verbose("Adding CPU %d affinity\n", a);
306 break;
307 case 'm':
308 if (argc < i + 2) {
309 show_usage(argc, argv);
310 return -1;
311 }
312 max_queue_len = atol(argv[++i]);
313 break;
314 case 'c':
315 if (argc < i + 2) {
316 show_usage(argc, argv);
317 return -1;
318 }
319 work_loops = atol(argv[++i]);
320 break;
321 case 'd':
322 if (argc < i + 2) {
323 show_usage(argc, argv);
324 return -1;
325 }
326 dispatch_delay_loops = atol(argv[++i]);
327 break;
328 case 'v':
329 verbose_mode = 1;
330 break;
331 case 'w':
332 test_wait_empty = 1;
333 break;
334 case 's':
335 test_steal = 1;
336 break;
337 }
338 }
339
340 printf_verbose("running test for %lu seconds, %u dispatchers, "
341 "%u workers.\n",
342 duration, nr_dispatchers, nr_workers);
343 if (test_wait_empty)
344 printf_verbose("Wait for workers to empty workqueue.\n");
345 printf_verbose("Work duration: %lu loops.\n", work_loops);
346 printf_verbose("Dispatcher arrival delay: %lu loops.\n", dispatch_delay_loops);
347 printf_verbose("thread %-6s, tid %lu\n",
348 "main", urcu_get_thread_id());
349
350 tid_dispatcher = calloc(nr_dispatchers, sizeof(*tid_dispatcher));
351 tid_worker = calloc(nr_workers, sizeof(*tid_worker));
352 count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher));
353 count_worker = calloc(nr_workers, sizeof(*count_worker));
354 if (test_steal)
355 worker_flags |= URCU_WORKER_STEAL;
356 urcu_workqueue_init(&workqueue, max_queue_len, worker_flags);
357
358 next_aff = 0;
359
360 for (i = 0; i < nr_dispatchers; i++) {
361 err = pthread_create(&tid_dispatcher[i], NULL, thr_dispatcher,
362 &count_dispatcher[i]);
363 if (err != 0)
364 exit(1);
365 }
366 for (i = 0; i < nr_workers; i++) {
367 err = pthread_create(&tid_worker[i], NULL, thr_worker,
368 &count_worker[i]);
369 if (err != 0)
370 exit(1);
371 }
372
373 cmm_smp_mb();
374
375 test_go = 1;
376
377 for (i = 0; i < duration; i++) {
378 sleep(1);
379 if (verbose_mode)
380 (void) write(1, ".", 1);
381 }
382
383 test_stop_enqueue = 1;
384 while (nr_dispatchers != uatomic_read(&test_enqueue_stopped)) {
385 sleep(1);
386 }
387
388 if (test_wait_empty) {
389 while (!cds_wfcq_empty(&workqueue.head, &workqueue.tail)) {
390 sleep(1);
391 }
392 }
393 urcu_workqueue_shutdown(&workqueue);
394
395 for (i = 0; i < nr_dispatchers; i++) {
396 err = pthread_join(tid_dispatcher[i], &tret);
397 if (err != 0)
398 exit(1);
399 tot_enqueues += count_dispatcher[i];
400 }
401 for (i = 0; i < nr_workers; i++) {
402 err = pthread_join(tid_worker[i], &tret);
403 if (err != 0)
404 exit(1);
405 tot_dequeues += count_worker[i];
406 }
407
408 printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
409 "work_loops %lu nr_workers %3u "
410 "nr_enqueues %12llu nr_dequeues %12llu max_queue_len %lu "
411 "work_stealing %s\n",
412 argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
413 nr_workers, tot_enqueues, tot_dequeues, max_queue_len,
414 test_steal ? "enabled" : "disabled");
415 free(count_dispatcher);
416 free(count_worker);
417 free(tid_dispatcher);
418 free(tid_worker);
419 return retval;
420 }
This page took 0.037558 seconds and 4 git commands to generate.