workqueue: implement benchmark
[userspace-rcu.git] / tests / benchmark / test_urcu_workqueue.c
1 /*
2 * test_urcu_workqueue.c
3 *
4 * Userspace RCU library - workqueue test
5 *
6 * Copyright February 2010-2014 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 * Copyright February 2010 - Paolo Bonzini <pbonzini@redhat.com>
8 *
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write to the Free Software Foundation, Inc.,
21 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 */
23
24 #define _GNU_SOURCE
25 #include "config.h"
26 #include <stdio.h>
27 #include <pthread.h>
28 #include <stdlib.h>
29 #include <stdint.h>
30 #include <stdbool.h>
31 #include <string.h>
32 #include <sys/types.h>
33 #include <sys/wait.h>
34 #include <unistd.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <errno.h>
38
39 #include <urcu/arch.h>
40 #include <urcu/tls-compat.h>
41 #include <urcu/uatomic.h>
42 #include "cpuset.h"
43 #include "thread-id.h"
44
45 /* hardcoded number of CPUs */
46 #define NR_CPUS 16384
47
48 #ifndef DYNAMIC_LINK_TEST
49 #define _LGPL_SOURCE
50 #endif
51 #include <urcu.h>
52 #include <urcu/wfstack.h>
53 #include <urcu/workqueue-fifo.h>
54
55 static volatile int test_go, test_stop_enqueue, test_stop_dequeue;
56
57 static unsigned long work_loops;
58
59 static unsigned long duration;
60
61 static unsigned long dispatch_delay_loops;
62
63 static inline void loop_sleep(unsigned long loops)
64 {
65 while (loops-- != 0)
66 caa_cpu_relax();
67 }
68
69 static int verbose_mode;
70
71 static int test_wait_empty;
72 static int test_enqueue_stopped;
73
74 #define printf_verbose(fmt, args...) \
75 do { \
76 if (verbose_mode) \
77 fprintf(stderr, fmt, ## args); \
78 } while (0)
79
80 static unsigned int cpu_affinities[NR_CPUS];
81 static unsigned int next_aff = 0;
82 static int use_affinity = 0;
83
84 pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER;
85
86 static void set_affinity(void)
87 {
88 #if HAVE_SCHED_SETAFFINITY
89 cpu_set_t mask;
90 int cpu, ret;
91 #endif /* HAVE_SCHED_SETAFFINITY */
92
93 if (!use_affinity)
94 return;
95
96 #if HAVE_SCHED_SETAFFINITY
97 ret = pthread_mutex_lock(&affinity_mutex);
98 if (ret) {
99 perror("Error in pthread mutex lock");
100 exit(-1);
101 }
102 cpu = cpu_affinities[next_aff++];
103 ret = pthread_mutex_unlock(&affinity_mutex);
104 if (ret) {
105 perror("Error in pthread mutex unlock");
106 exit(-1);
107 }
108
109 CPU_ZERO(&mask);
110 CPU_SET(cpu, &mask);
111 #if SCHED_SETAFFINITY_ARGS == 2
112 sched_setaffinity(0, &mask);
113 #else
114 sched_setaffinity(0, sizeof(mask), &mask);
115 #endif
116 #endif /* HAVE_SCHED_SETAFFINITY */
117 }
118
119 /*
120 * returns 0 if test should end.
121 */
122 static int test_duration_dequeue(void)
123 {
124 return !test_stop_dequeue;
125 }
126
127 static int test_duration_enqueue(void)
128 {
129 return !test_stop_enqueue;
130 }
131
132 static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
133 static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
134
135 static unsigned int nr_dispatchers;
136 static unsigned int nr_workers;
137
138 static struct urcu_workqueue workqueue;
139
140 struct test_work {
141 struct urcu_work w;
142 };
143
144 static void *thr_dispatcher(void *_count)
145 {
146 unsigned long long *count = _count;
147 bool was_nonempty;
148
149 printf_verbose("thread_begin %s, tid %lu\n",
150 "dispatcher", urcu_get_thread_id());
151
152 set_affinity();
153
154 while (!test_go)
155 {
156 }
157 cmm_smp_mb();
158
159 for (;;) {
160 struct test_work *work = malloc(sizeof(*work));
161 if (!work)
162 goto fail;
163 printf_verbose("queue work %p\n", work);
164 urcu_queue_work(&workqueue, &work->w);
165 URCU_TLS(nr_enqueues)++;
166
167 if (caa_unlikely(dispatch_delay_loops))
168 loop_sleep(dispatch_delay_loops);
169 fail:
170 if (caa_unlikely(!test_duration_enqueue()))
171 break;
172 }
173
174 uatomic_inc(&test_enqueue_stopped);
175 count[0] = URCU_TLS(nr_enqueues);
176 printf_verbose("dispatcher thread_end, tid %lu, "
177 "enqueues %llu\n",
178 urcu_get_thread_id(),
179 URCU_TLS(nr_enqueues));
180 return ((void*)1);
181 }
182
183 static void *thr_worker(void *_count)
184 {
185 unsigned long long *count = _count;
186 unsigned int counter = 0;
187 struct urcu_worker worker;
188 int blocking = 1;
189
190 printf_verbose("thread_begin %s, tid %lu\n",
191 "worker", urcu_get_thread_id());
192
193 set_affinity();
194
195 rcu_register_thread();
196 urcu_worker_init(&worker, URCU_WORKER_STEAL);
197 //urcu_worker_init(&worker, 0);
198 urcu_worker_register(&workqueue, &worker);
199
200 while (!test_go)
201 {
202 }
203 cmm_smp_mb();
204
205 for (;;) {
206 int batch_work_count = 0;
207
208 urcu_accept_work(&workqueue, &worker, blocking);
209 for (;;) {
210 struct urcu_work *work;
211 struct test_work *t;
212
213 work = urcu_dequeue_work(&worker);
214 if (!work)
215 break;
216 t = caa_container_of(work, struct test_work, w);
217 printf_verbose("dequeue work %p\n", t);
218 batch_work_count++;
219 URCU_TLS(nr_dequeues)++;
220 if (caa_unlikely(work_loops))
221 loop_sleep(work_loops);
222 free(t);
223 }
224 if (!test_duration_dequeue())
225 blocking = 0;
226 if (caa_unlikely(!test_duration_dequeue()
227 && !batch_work_count))
228 break;
229 }
230 end:
231 urcu_worker_unregister(&workqueue, &worker);
232 rcu_unregister_thread();
233
234 printf_verbose("worker thread_end, tid %lu, "
235 "dequeues %llu\n",
236 urcu_get_thread_id(),
237 URCU_TLS(nr_dequeues));
238 count[0] = URCU_TLS(nr_dequeues);
239 return ((void*)2);
240 }
241
242 static void show_usage(int argc, char **argv)
243 {
244 printf("Usage : %s nr_workers nr_dispatchers duration (s) <OPTIONS>\n",
245 argv[0]);
246 printf("OPTIONS:\n");
247 printf(" [-d delay] (dispatcher period (in loops))\n");
248 printf(" [-c duration] (worker period (in loops))\n");
249 printf(" [-v] (verbose output)\n");
250 printf(" [-a cpu#] [-a cpu#]... (affinity)\n");
251 printf(" [-w] Wait for worker to empty stack\n");
252 printf("\n");
253 }
254
255 int main(int argc, char **argv)
256 {
257 int err;
258 pthread_t *tid_dispatcher, *tid_worker;
259 void *tret;
260 unsigned long long *count_dispatcher, *count_worker;
261 unsigned long long tot_enqueues = 0, tot_dequeues = 0;
262 unsigned long long end_dequeues = 0;
263 int i, a, retval = 0;
264
265 if (argc < 4) {
266 show_usage(argc, argv);
267 return -1;
268 }
269
270 err = sscanf(argv[1], "%u", &nr_workers);
271 if (err != 1) {
272 show_usage(argc, argv);
273 return -1;
274 }
275
276 err = sscanf(argv[2], "%u", &nr_dispatchers);
277 if (err != 1) {
278 show_usage(argc, argv);
279 return -1;
280 }
281
282 err = sscanf(argv[3], "%lu", &duration);
283 if (err != 1) {
284 show_usage(argc, argv);
285 return -1;
286 }
287
288 for (i = 4; i < argc; i++) {
289 if (argv[i][0] != '-')
290 continue;
291 switch (argv[i][1]) {
292 case 'a':
293 if (argc < i + 2) {
294 show_usage(argc, argv);
295 return -1;
296 }
297 a = atoi(argv[++i]);
298 cpu_affinities[next_aff++] = a;
299 use_affinity = 1;
300 printf_verbose("Adding CPU %d affinity\n", a);
301 break;
302 case 'c':
303 if (argc < i + 2) {
304 show_usage(argc, argv);
305 return -1;
306 }
307 work_loops = atol(argv[++i]);
308 break;
309 case 'd':
310 if (argc < i + 2) {
311 show_usage(argc, argv);
312 return -1;
313 }
314 dispatch_delay_loops = atol(argv[++i]);
315 break;
316 case 'v':
317 verbose_mode = 1;
318 break;
319 case 'w':
320 test_wait_empty = 1;
321 break;
322 }
323 }
324
325 printf_verbose("running test for %lu seconds, %u dispatchers, "
326 "%u workers.\n",
327 duration, nr_dispatchers, nr_workers);
328 if (test_wait_empty)
329 printf_verbose("Wait for workers to empty workqueue.\n");
330 printf_verbose("Work duration: %lu loops.\n", work_loops);
331 printf_verbose("Dispatcher arrival delay: %lu loops.\n", dispatch_delay_loops);
332 printf_verbose("thread %-6s, tid %lu\n",
333 "main", urcu_get_thread_id());
334
335 tid_dispatcher = calloc(nr_dispatchers, sizeof(*tid_dispatcher));
336 tid_worker = calloc(nr_workers, sizeof(*tid_worker));
337 count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher));
338 count_worker = calloc(nr_workers, sizeof(*count_worker));
339 urcu_workqueue_init(&workqueue);
340
341 next_aff = 0;
342
343 for (i = 0; i < nr_dispatchers; i++) {
344 err = pthread_create(&tid_dispatcher[i], NULL, thr_dispatcher,
345 &count_dispatcher[i]);
346 if (err != 0)
347 exit(1);
348 }
349 for (i = 0; i < nr_workers; i++) {
350 err = pthread_create(&tid_worker[i], NULL, thr_worker,
351 &count_worker[i]);
352 if (err != 0)
353 exit(1);
354 }
355
356 cmm_smp_mb();
357
358 test_go = 1;
359
360 for (i = 0; i < duration; i++) {
361 sleep(1);
362 if (verbose_mode)
363 (void) write(1, ".", 1);
364 }
365
366 test_stop_enqueue = 1;
367 while (nr_dispatchers != uatomic_read(&test_enqueue_stopped)) {
368 sleep(1);
369 }
370
371 if (test_wait_empty) {
372 while (!cds_wfcq_empty(&workqueue.head, &workqueue.tail)) {
373 sleep(1);
374 }
375 }
376 test_stop_dequeue = 1;
377
378 /* Send finish to all workers */
379 urcu_workqueue_wakeup_all(&workqueue);
380
381 for (i = 0; i < nr_dispatchers; i++) {
382 err = pthread_join(tid_dispatcher[i], &tret);
383 if (err != 0)
384 exit(1);
385 tot_enqueues += count_dispatcher[i];
386 }
387 for (i = 0; i < nr_workers; i++) {
388 err = pthread_join(tid_worker[i], &tret);
389 if (err != 0)
390 exit(1);
391 tot_dequeues += count_worker[i];
392 }
393
394 printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
395 "work_loops %lu nr_workers %3u "
396 "nr_enqueues %12llu nr_dequeues %12llu\n",
397 argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
398 nr_workers, tot_enqueues, tot_dequeues);
399 free(count_dispatcher);
400 free(count_worker);
401 free(tid_dispatcher);
402 free(tid_worker);
403 return retval;
404 }
This page took 0.037862 seconds and 4 git commands to generate.