workqueue: implement shutdown
[userspace-rcu.git] / tests / benchmark / test_urcu_workqueue.c
CommitLineData
1abec5a0
MD
1/*
2 * test_urcu_workqueue.c
3 *
4 * Userspace RCU library - workqueue test
5 *
6 * Copyright February 2010-2014 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 * Copyright February 2010 - Paolo Bonzini <pbonzini@redhat.com>
8 *
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write to the Free Software Foundation, Inc.,
21 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 */
23
24#define _GNU_SOURCE
25#include "config.h"
26#include <stdio.h>
27#include <pthread.h>
28#include <stdlib.h>
29#include <stdint.h>
30#include <stdbool.h>
31#include <string.h>
32#include <sys/types.h>
33#include <sys/wait.h>
34#include <unistd.h>
35#include <stdio.h>
36#include <assert.h>
37#include <errno.h>
38
39#include <urcu/arch.h>
40#include <urcu/tls-compat.h>
41#include <urcu/uatomic.h>
42#include "cpuset.h"
43#include "thread-id.h"
44
45/* hardcoded number of CPUs */
46#define NR_CPUS 16384
47
48#ifndef DYNAMIC_LINK_TEST
49#define _LGPL_SOURCE
50#endif
51#include <urcu.h>
52#include <urcu/wfstack.h>
53#include <urcu/workqueue-fifo.h>
54
1b0a9891 55static volatile int test_go, test_stop_enqueue;
1abec5a0
MD
56
57static unsigned long work_loops;
58
59static unsigned long duration;
60
61static unsigned long dispatch_delay_loops;
62
63static inline void loop_sleep(unsigned long loops)
64{
65 while (loops-- != 0)
66 caa_cpu_relax();
67}
68
69static int verbose_mode;
70
71static int test_wait_empty;
72static int test_enqueue_stopped;
73
74#define printf_verbose(fmt, args...) \
75 do { \
76 if (verbose_mode) \
77 fprintf(stderr, fmt, ## args); \
78 } while (0)
79
80static unsigned int cpu_affinities[NR_CPUS];
81static unsigned int next_aff = 0;
82static int use_affinity = 0;
83
84pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER;
85
86static void set_affinity(void)
87{
88#if HAVE_SCHED_SETAFFINITY
89 cpu_set_t mask;
90 int cpu, ret;
91#endif /* HAVE_SCHED_SETAFFINITY */
92
93 if (!use_affinity)
94 return;
95
96#if HAVE_SCHED_SETAFFINITY
97 ret = pthread_mutex_lock(&affinity_mutex);
98 if (ret) {
99 perror("Error in pthread mutex lock");
100 exit(-1);
101 }
102 cpu = cpu_affinities[next_aff++];
103 ret = pthread_mutex_unlock(&affinity_mutex);
104 if (ret) {
105 perror("Error in pthread mutex unlock");
106 exit(-1);
107 }
108
109 CPU_ZERO(&mask);
110 CPU_SET(cpu, &mask);
111#if SCHED_SETAFFINITY_ARGS == 2
112 sched_setaffinity(0, &mask);
113#else
114 sched_setaffinity(0, sizeof(mask), &mask);
115#endif
116#endif /* HAVE_SCHED_SETAFFINITY */
117}
118
119/*
120 * returns 0 if test should end.
121 */
1abec5a0
MD
122static int test_duration_enqueue(void)
123{
124 return !test_stop_enqueue;
125}
126
127static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
128static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
129
130static unsigned int nr_dispatchers;
131static unsigned int nr_workers;
132
133static struct urcu_workqueue workqueue;
134
135struct test_work {
136 struct urcu_work w;
137};
138
139static void *thr_dispatcher(void *_count)
140{
141 unsigned long long *count = _count;
142 bool was_nonempty;
143
144 printf_verbose("thread_begin %s, tid %lu\n",
145 "dispatcher", urcu_get_thread_id());
146
147 set_affinity();
148
149 while (!test_go)
150 {
151 }
152 cmm_smp_mb();
153
154 for (;;) {
155 struct test_work *work = malloc(sizeof(*work));
156 if (!work)
157 goto fail;
158 printf_verbose("queue work %p\n", work);
159 urcu_queue_work(&workqueue, &work->w);
160 URCU_TLS(nr_enqueues)++;
161
162 if (caa_unlikely(dispatch_delay_loops))
163 loop_sleep(dispatch_delay_loops);
164fail:
165 if (caa_unlikely(!test_duration_enqueue()))
166 break;
167 }
168
169 uatomic_inc(&test_enqueue_stopped);
170 count[0] = URCU_TLS(nr_enqueues);
171 printf_verbose("dispatcher thread_end, tid %lu, "
172 "enqueues %llu\n",
173 urcu_get_thread_id(),
174 URCU_TLS(nr_enqueues));
175 return ((void*)1);
176}
177
178static void *thr_worker(void *_count)
179{
180 unsigned long long *count = _count;
181 unsigned int counter = 0;
182 struct urcu_worker worker;
1abec5a0
MD
183
184 printf_verbose("thread_begin %s, tid %lu\n",
185 "worker", urcu_get_thread_id());
186
187 set_affinity();
188
189 rcu_register_thread();
190 urcu_worker_init(&worker, URCU_WORKER_STEAL);
191 //urcu_worker_init(&worker, 0);
192 urcu_worker_register(&workqueue, &worker);
193
194 while (!test_go)
195 {
196 }
197 cmm_smp_mb();
198
199 for (;;) {
1b0a9891 200 enum urcu_accept_ret ret;
1abec5a0 201
1b0a9891
MD
202 ret = urcu_accept_work(&workqueue, &worker);
203 if (ret == URCU_ACCEPT_SHUTDOWN)
204 break;
1abec5a0
MD
205 for (;;) {
206 struct urcu_work *work;
207 struct test_work *t;
208
209 work = urcu_dequeue_work(&worker);
210 if (!work)
211 break;
212 t = caa_container_of(work, struct test_work, w);
213 printf_verbose("dequeue work %p\n", t);
1abec5a0
MD
214 URCU_TLS(nr_dequeues)++;
215 if (caa_unlikely(work_loops))
216 loop_sleep(work_loops);
217 free(t);
218 }
1abec5a0
MD
219 }
220end:
221 urcu_worker_unregister(&workqueue, &worker);
222 rcu_unregister_thread();
223
224 printf_verbose("worker thread_end, tid %lu, "
225 "dequeues %llu\n",
226 urcu_get_thread_id(),
227 URCU_TLS(nr_dequeues));
228 count[0] = URCU_TLS(nr_dequeues);
229 return ((void*)2);
230}
231
232static void show_usage(int argc, char **argv)
233{
234 printf("Usage : %s nr_workers nr_dispatchers duration (s) <OPTIONS>\n",
235 argv[0]);
236 printf("OPTIONS:\n");
237 printf(" [-d delay] (dispatcher period (in loops))\n");
238 printf(" [-c duration] (worker period (in loops))\n");
239 printf(" [-v] (verbose output)\n");
240 printf(" [-a cpu#] [-a cpu#]... (affinity)\n");
241 printf(" [-w] Wait for worker to empty stack\n");
242 printf("\n");
243}
244
245int main(int argc, char **argv)
246{
247 int err;
248 pthread_t *tid_dispatcher, *tid_worker;
249 void *tret;
250 unsigned long long *count_dispatcher, *count_worker;
251 unsigned long long tot_enqueues = 0, tot_dequeues = 0;
252 unsigned long long end_dequeues = 0;
253 int i, a, retval = 0;
254
255 if (argc < 4) {
256 show_usage(argc, argv);
257 return -1;
258 }
259
260 err = sscanf(argv[1], "%u", &nr_workers);
261 if (err != 1) {
262 show_usage(argc, argv);
263 return -1;
264 }
265
266 err = sscanf(argv[2], "%u", &nr_dispatchers);
267 if (err != 1) {
268 show_usage(argc, argv);
269 return -1;
270 }
271
272 err = sscanf(argv[3], "%lu", &duration);
273 if (err != 1) {
274 show_usage(argc, argv);
275 return -1;
276 }
277
278 for (i = 4; i < argc; i++) {
279 if (argv[i][0] != '-')
280 continue;
281 switch (argv[i][1]) {
282 case 'a':
283 if (argc < i + 2) {
284 show_usage(argc, argv);
285 return -1;
286 }
287 a = atoi(argv[++i]);
288 cpu_affinities[next_aff++] = a;
289 use_affinity = 1;
290 printf_verbose("Adding CPU %d affinity\n", a);
291 break;
292 case 'c':
293 if (argc < i + 2) {
294 show_usage(argc, argv);
295 return -1;
296 }
297 work_loops = atol(argv[++i]);
298 break;
299 case 'd':
300 if (argc < i + 2) {
301 show_usage(argc, argv);
302 return -1;
303 }
304 dispatch_delay_loops = atol(argv[++i]);
305 break;
306 case 'v':
307 verbose_mode = 1;
308 break;
309 case 'w':
310 test_wait_empty = 1;
311 break;
312 }
313 }
314
315 printf_verbose("running test for %lu seconds, %u dispatchers, "
316 "%u workers.\n",
317 duration, nr_dispatchers, nr_workers);
318 if (test_wait_empty)
319 printf_verbose("Wait for workers to empty workqueue.\n");
320 printf_verbose("Work duration: %lu loops.\n", work_loops);
321 printf_verbose("Dispatcher arrival delay: %lu loops.\n", dispatch_delay_loops);
322 printf_verbose("thread %-6s, tid %lu\n",
323 "main", urcu_get_thread_id());
324
325 tid_dispatcher = calloc(nr_dispatchers, sizeof(*tid_dispatcher));
326 tid_worker = calloc(nr_workers, sizeof(*tid_worker));
327 count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher));
328 count_worker = calloc(nr_workers, sizeof(*count_worker));
329 urcu_workqueue_init(&workqueue);
330
331 next_aff = 0;
332
333 for (i = 0; i < nr_dispatchers; i++) {
334 err = pthread_create(&tid_dispatcher[i], NULL, thr_dispatcher,
335 &count_dispatcher[i]);
336 if (err != 0)
337 exit(1);
338 }
339 for (i = 0; i < nr_workers; i++) {
340 err = pthread_create(&tid_worker[i], NULL, thr_worker,
341 &count_worker[i]);
342 if (err != 0)
343 exit(1);
344 }
345
346 cmm_smp_mb();
347
348 test_go = 1;
349
350 for (i = 0; i < duration; i++) {
351 sleep(1);
352 if (verbose_mode)
353 (void) write(1, ".", 1);
354 }
355
356 test_stop_enqueue = 1;
357 while (nr_dispatchers != uatomic_read(&test_enqueue_stopped)) {
358 sleep(1);
359 }
360
361 if (test_wait_empty) {
362 while (!cds_wfcq_empty(&workqueue.head, &workqueue.tail)) {
363 sleep(1);
364 }
365 }
1b0a9891 366 urcu_workqueue_shutdown(&workqueue);
1abec5a0
MD
367
368 for (i = 0; i < nr_dispatchers; i++) {
369 err = pthread_join(tid_dispatcher[i], &tret);
370 if (err != 0)
371 exit(1);
372 tot_enqueues += count_dispatcher[i];
373 }
374 for (i = 0; i < nr_workers; i++) {
375 err = pthread_join(tid_worker[i], &tret);
376 if (err != 0)
377 exit(1);
378 tot_dequeues += count_worker[i];
379 }
380
381 printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
382 "work_loops %lu nr_workers %3u "
383 "nr_enqueues %12llu nr_dequeues %12llu\n",
384 argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
385 nr_workers, tot_enqueues, tot_dequeues);
386 free(count_dispatcher);
387 free(count_worker);
388 free(tid_dispatcher);
389 free(tid_worker);
390 return retval;
391}
This page took 0.036199 seconds and 4 git commands to generate.