90587b728791d382e4c711efb6854c4cdfba811b
[userspace-rcu.git] / tests / test_urcu_lfq.c
1 /*
2 * test_urcu.c
3 *
4 * Userspace RCU library - example RCU-based lock-free queue
5 *
6 * Copyright February 2010 - Paolo Bonzini <pbonzinI@redhat.com>
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #define _GNU_SOURCE
24 #include "../config.h"
25 #include <stdio.h>
26 #include <pthread.h>
27 #include <stdlib.h>
28 #include <stdint.h>
29 #include <stdbool.h>
30 #include <string.h>
31 #include <sys/types.h>
32 #include <sys/wait.h>
33 #include <unistd.h>
34 #include <stdio.h>
35 #include <assert.h>
36 #include <sys/syscall.h>
37 #include <sched.h>
38 #include <errno.h>
39
40 #include <urcu/arch.h>
41
42 /* hardcoded number of CPUs */
43 #define NR_CPUS 16384
44
45 #if defined(_syscall0)
46 _syscall0(pid_t, gettid)
47 #elif defined(__NR_gettid)
48 static inline pid_t gettid(void)
49 {
50 return syscall(__NR_gettid);
51 }
52 #else
53 #warning "use pid as tid"
54 static inline pid_t gettid(void)
55 {
56 return getpid();
57 }
58 #endif
59
60 #ifndef DYNAMIC_LINK_TEST
61 #define _LGPL_SOURCE
62 #endif
63 #include <urcu.h>
64
65
66 static volatile int test_go, test_stop;
67
68 static unsigned long rduration;
69
70 static unsigned long duration;
71
72 /* read-side C.S. duration, in loops */
73 static unsigned long wdelay;
74
75 static inline void loop_sleep(unsigned long l)
76 {
77 while(l-- != 0)
78 cpu_relax();
79 }
80
81 static int verbose_mode;
82
83 #define printf_verbose(fmt, args...) \
84 do { \
85 if (verbose_mode) \
86 printf(fmt, args); \
87 } while (0)
88
89 static unsigned int cpu_affinities[NR_CPUS];
90 static unsigned int next_aff = 0;
91 static int use_affinity = 0;
92
93 pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER;
94
95 #ifndef HAVE_CPU_SET_T
96 typedef unsigned long cpu_set_t;
97 # define CPU_ZERO(cpuset) do { *(cpuset) = 0; } while(0)
98 # define CPU_SET(cpu, cpuset) do { *(cpuset) |= (1UL << (cpu)); } while(0)
99 #endif
100
101 static void set_affinity(void)
102 {
103 cpu_set_t mask;
104 int cpu;
105 int ret;
106
107 if (!use_affinity)
108 return;
109
110 #if HAVE_SCHED_SETAFFINITY
111 ret = pthread_mutex_lock(&affinity_mutex);
112 if (ret) {
113 perror("Error in pthread mutex lock");
114 exit(-1);
115 }
116 cpu = cpu_affinities[next_aff++];
117 ret = pthread_mutex_unlock(&affinity_mutex);
118 if (ret) {
119 perror("Error in pthread mutex unlock");
120 exit(-1);
121 }
122
123 CPU_ZERO(&mask);
124 CPU_SET(cpu, &mask);
125 #if SCHED_SETAFFINITY_ARGS == 2
126 sched_setaffinity(0, &mask);
127 #else
128 sched_setaffinity(0, sizeof(mask), &mask);
129 #endif
130 #endif /* HAVE_SCHED_SETAFFINITY */
131 }
132
133 /*
134 * returns 0 if test should end.
135 */
136 static int test_duration_dequeue(void)
137 {
138 return !test_stop;
139 }
140
141 static int test_duration_enqueue(void)
142 {
143 return !test_stop;
144 }
145
146 static unsigned long long __thread nr_dequeues;
147 static unsigned long long __thread nr_enqueues;
148
149 static unsigned long nr_successful_dequeues;
150 static unsigned int nr_enqueuers;
151 static unsigned int nr_dequeuers;
152
153
154 #define ARRAY_POISON 0xDEADBEEF
155 #define PAGE_SIZE 4096
156 #define PAGE_MASK (PAGE_SIZE - 1)
157 #define NODES_PER_PAGE (PAGE_SIZE - 16) / sizeof (struct node)
158
159 /* Lock-free queue, using the RCU to avoid the ABA problem and (more
160 interestingly) to efficiently handle freeing memory.
161
162 We have to protect both the enqueuer and dequeuer's compare-and-
163 exchange operation from running across a free and a subsequent
164 reallocation of the same memory. So, we protect the free with
165 synchronize_rcu; this is enough because all the allocations take
166 place before the compare-and-exchange ops.
167
168 Besides adding rcu_read_{,un}lock, the enqueue/dequeue are a standard
169 implementation of a lock-free-queue. The first node in the queue is
170 always dummy: dequeuing returns the data from HEAD->NEXT, advances
171 HEAD to HEAD->NEXT (which will now serve as dummy node), and frees the
172 old HEAD. Since RCU avoids the ABA problem, it doesn't use double-word
173 compare-and-exchange operations. Node allocation and deallocation are
174 a "black box" and synchronize_rcu is hidden within node deallocation.
175
176 So, the tricky part is finding a good allocation strategy for nodes.
177 The allocator for nodes is shared by multiple threads, and since
178 malloc/free are not lock-free a layer above them is obviously
179 necessary: otherwise the whole exercise is useless. In addition,
180 to avoid penalizing dequeues, the allocator should avoid frequent
181 synchronization (because synchronize_rcu is expensive).
182
183 The scheme that is used here uses a page as the allocation
184 unit for nodes. A page is freed when no more nodes are in use.
185 Nodes from a page are never reused.
186
187 The nodes are allocated from Q->CURRENT. Since whoever finds a full
188 page has to busy wait, we use a trick to limit the duration of busy
189 waiting. A free page Q->FREE is always kept ready, so that any thread
190 that allocates the last node in a page, or finds a full page can try
191 to update Q->CURRENT. Whoever loses the race has to busy wait, OTOH
192 whoever wins the race has to allocate the new Q->FREE. In other words,
193 if the following sequence happens
194
195 Thread 1 Thread 2 other threads
196 -----------------------------------------------------------------------
197 Get last node from page
198 q->current = q->free;
199 fill up q->current
200 q->current = q->free fails
201
202 then thread 1 does not have anymore the duty of allocating q->current;
203 thread 2 will do that. If a thread finds a full current page and
204 Q->CURRENT == Q->FREE, this means that another thread is going to
205 allocate Q->FREE soon, and it busy waits. After the allocation
206 finishes, everything proceeds normally: some thread will take care
207 of setting Q->CURRENT and allocating a new Q->FREE.
208
209 One common scheme for allocation is to use a free list (implemented
210 as a lock-free stack), but this free list is potentially unbounded.
211 Instead, with the above scheme the number of live pages at any time
212 is equal to the number of enqueuing threads. */
213
214 struct node {
215 void *data;
216 void *next;
217 };
218
219 struct node_page {
220 int in;
221 int out;
222 char padding[16 - sizeof(int) * 2];
223 struct node nodes[NODES_PER_PAGE];
224 };
225
226
227 struct queue {
228 struct node_page *current, *free;
229 struct node *head, *tail;
230 };
231
232 static struct node_page *new_node_page()
233 {
234 struct node_page *p = valloc (PAGE_SIZE);
235 p->in = p->out = 0;
236 return p;
237 }
238
239 static void free_node_page(struct node_page *p)
240 {
241 /* Help making sure that accessing a dangling pointer is
242 adequately punished. */
243 p->in = ARRAY_POISON;
244 free (p);
245 }
246
247 static struct node *new_node(struct queue *q)
248 {
249 struct node *n;
250 struct node_page *p;
251 int i;
252
253 do {
254 p = q->current;
255 i = p->in;
256 if (i >= NODES_PER_PAGE - 1 &&
257 q->free != p &&
258 uatomic_cmpxchg(&q->current, p, q->free) == p)
259 q->free = new_node_page();
260
261 } while (i == NODES_PER_PAGE || uatomic_cmpxchg(&p->in, i, i+1) != i);
262
263 assert (i >= 0 && i < NODES_PER_PAGE);
264 n = &p->nodes[i];
265 n->next = NULL;
266 return n;
267 }
268
269 void free_node(struct node *n)
270 {
271 struct node_page *p = (struct node_page *) ((intptr_t) n & ~PAGE_MASK);
272
273 if (uatomic_add_return(&p->out, 1) == NODES_PER_PAGE) {
274 synchronize_rcu();
275 free_node_page(p);
276 }
277 }
278
279 void init_queue(struct queue *q)
280 {
281 q->current = new_node_page();
282 q->free = new_node_page();
283 q->head = q->tail = new_node(q);
284 }
285
286 void enqueue(struct queue *q, void *value)
287 {
288 struct node *n = new_node(q);
289 n->data = value;
290 rcu_read_lock();
291 for (;;) {
292 struct node *tail = rcu_dereference(q->tail);
293 struct node *next = rcu_dereference(tail->next);
294 if (tail != q->tail) {
295 /* A change occurred while reading the values. */
296 continue;
297 }
298
299 if (next) {
300 /* Help moving tail further. */
301 uatomic_cmpxchg(&q->tail, tail, next);
302 continue;
303 }
304
305 if (uatomic_cmpxchg(&tail->next, NULL, n) == NULL) {
306 /* Move tail (another operation might beat us to it,
307 that's fine). */
308 uatomic_cmpxchg(&q->tail, tail, n);
309 rcu_read_unlock();
310 return;
311 }
312 }
313 }
314
315 void *dequeue(struct queue *q, bool *not_empty)
316 {
317 bool dummy;
318 if (!not_empty)
319 not_empty = &dummy;
320
321 rcu_read_lock();
322 *not_empty = false;
323 for (;;) {
324 void *data;
325 struct node *head = rcu_dereference(q->head);
326 struct node *tail = rcu_dereference(q->tail);
327 struct node *next = rcu_dereference(head->next);
328
329 if (head != q->head) {
330 /* A change occurred while reading the values. */
331 continue;
332 }
333
334 if (head == tail) {
335 /* If all three are consistent, the queue is empty. */
336 if (!next)
337 return NULL;
338
339 /* Help moving tail further. */
340 uatomic_cmpxchg(&q->tail, tail, next);
341 continue;
342 }
343
344 data = next->data;
345 if (uatomic_cmpxchg(&q->head, head, next) == head) {
346 /* Next remains as a dummy node, head is freed. */
347 rcu_read_unlock();
348 *not_empty = true;
349 free_node (head);
350 return data;
351 }
352 }
353 }
354
355 \f
356 static struct queue q;
357
358 void *thr_enqueuer(void *_count)
359 {
360 unsigned long long *count = _count;
361
362 printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
363 "enqueuer", pthread_self(), (unsigned long)gettid());
364
365 set_affinity();
366
367 rcu_register_thread();
368
369 while (!test_go)
370 {
371 }
372 smp_mb();
373
374 for (;;) {
375 enqueue (&q, NULL);
376
377 if (unlikely(wdelay))
378 loop_sleep(wdelay);
379 nr_enqueues++;
380 if (unlikely(!test_duration_enqueue()))
381 break;
382 }
383
384 rcu_unregister_thread();
385
386 *count = nr_enqueues;
387 printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
388 "enqueuer", pthread_self(), (unsigned long)gettid(), nr_enqueues);
389 return ((void*)1);
390
391 }
392
393 void *thr_dequeuer(void *_count)
394 {
395 unsigned long long *count = _count;
396
397 printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
398 "dequeuer", pthread_self(), (unsigned long)gettid());
399
400 set_affinity();
401
402 rcu_register_thread();
403
404 while (!test_go)
405 {
406 }
407 smp_mb();
408
409 for (;;) {
410 bool not_empty;
411 dequeue (&q, &not_empty);
412 if (not_empty)
413 uatomic_inc (&nr_successful_dequeues);
414
415 nr_dequeues++;
416 if (unlikely(!test_duration_dequeue()))
417 break;
418 if (unlikely(rduration))
419 loop_sleep(rduration);
420 }
421
422 rcu_unregister_thread();
423
424 printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
425 "dequeuer", pthread_self(), (unsigned long)gettid(), nr_dequeues);
426 *count = nr_dequeues;
427 return ((void*)2);
428 }
429
430 void test_end(struct queue *q)
431 {
432 bool not_empty;
433 do
434 dequeue (q, &not_empty);
435 while (!not_empty);
436 if (q->current != q->free)
437 free_node_page(q->free);
438 free_node_page(q->current);
439 }
440
441 void show_usage(int argc, char **argv)
442 {
443 printf("Usage : %s nr_dequeuers nr_enqueuers duration (s)", argv[0]);
444 printf(" [-d delay] (enqueuer period (in loops))");
445 printf(" [-c duration] (dequeuer period (in loops))");
446 printf(" [-v] (verbose output)");
447 printf(" [-a cpu#] [-a cpu#]... (affinity)");
448 printf("\n");
449 }
450
451 int main(int argc, char **argv)
452 {
453 int err;
454 pthread_t *tid_enqueuer, *tid_dequeuer;
455 void *tret;
456 unsigned long long *count_enqueuer, *count_dequeuer;
457 unsigned long long tot_enqueues = 0, tot_dequeues = 0;
458 int i, a;
459
460 if (argc < 4) {
461 show_usage(argc, argv);
462 return -1;
463 }
464
465 err = sscanf(argv[1], "%u", &nr_dequeuers);
466 if (err != 1) {
467 show_usage(argc, argv);
468 return -1;
469 }
470
471 err = sscanf(argv[2], "%u", &nr_enqueuers);
472 if (err != 1) {
473 show_usage(argc, argv);
474 return -1;
475 }
476
477 err = sscanf(argv[3], "%lu", &duration);
478 if (err != 1) {
479 show_usage(argc, argv);
480 return -1;
481 }
482
483 for (i = 4; i < argc; i++) {
484 if (argv[i][0] != '-')
485 continue;
486 switch (argv[i][1]) {
487 case 'a':
488 if (argc < i + 2) {
489 show_usage(argc, argv);
490 return -1;
491 }
492 a = atoi(argv[++i]);
493 cpu_affinities[next_aff++] = a;
494 use_affinity = 1;
495 printf_verbose("Adding CPU %d affinity\n", a);
496 break;
497 case 'c':
498 if (argc < i + 2) {
499 show_usage(argc, argv);
500 return -1;
501 }
502 rduration = atol(argv[++i]);
503 break;
504 case 'd':
505 if (argc < i + 2) {
506 show_usage(argc, argv);
507 return -1;
508 }
509 wdelay = atol(argv[++i]);
510 break;
511 case 'v':
512 verbose_mode = 1;
513 break;
514 }
515 }
516
517 printf_verbose("running test for %lu seconds, %u enqueuers, %u dequeuers.\n",
518 duration, nr_enqueuers, nr_dequeuers);
519 printf_verbose("Writer delay : %lu loops.\n", rduration);
520 printf_verbose("Reader duration : %lu loops.\n", wdelay);
521 printf_verbose("thread %-6s, thread id : %lx, tid %lu\n",
522 "main", pthread_self(), (unsigned long)gettid());
523
524 tid_enqueuer = malloc(sizeof(*tid_enqueuer) * nr_enqueuers);
525 tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
526 count_enqueuer = malloc(sizeof(*count_enqueuer) * nr_enqueuers);
527 count_dequeuer = malloc(sizeof(*count_dequeuer) * nr_dequeuers);
528 init_queue (&q);
529
530 next_aff = 0;
531
532 for (i = 0; i < nr_enqueuers; i++) {
533 err = pthread_create(&tid_enqueuer[i], NULL, thr_enqueuer,
534 &count_enqueuer[i]);
535 if (err != 0)
536 exit(1);
537 }
538 for (i = 0; i < nr_dequeuers; i++) {
539 err = pthread_create(&tid_dequeuer[i], NULL, thr_dequeuer,
540 &count_dequeuer[i]);
541 if (err != 0)
542 exit(1);
543 }
544
545 smp_mb();
546
547 test_go = 1;
548
549 for (i = 0; i < duration; i++) {
550 sleep(1);
551 if (verbose_mode)
552 write (1, ".", 1);
553 }
554
555 test_stop = 1;
556
557 for (i = 0; i < nr_enqueuers; i++) {
558 err = pthread_join(tid_enqueuer[i], &tret);
559 if (err != 0)
560 exit(1);
561 tot_enqueues += count_enqueuer[i];
562 }
563 for (i = 0; i < nr_dequeuers; i++) {
564 err = pthread_join(tid_dequeuer[i], &tret);
565 if (err != 0)
566 exit(1);
567 tot_dequeues += count_dequeuer[i];
568 }
569
570 printf_verbose("total number of enqueues : %llu, dequeues %llu\n", tot_enqueues,
571 tot_dequeues);
572 printf("SUMMARY %-25s testdur %4lu nr_enqueuers %3u wdelay %6lu "
573 "nr_dequeuers %3u "
574 "rdur %6lu nr_enqueues %12llu nr_dequeues %12llu successful %12lu nr_ops %12llu\n",
575 argv[0], duration, nr_enqueuers, wdelay,
576 nr_dequeuers, rduration, tot_enqueues, tot_dequeues,
577 nr_successful_dequeues, tot_enqueues + tot_dequeues);
578
579 test_end(&q);
580 free(tid_enqueuer);
581 free(tid_dequeuer);
582 free(count_enqueuer);
583 free(count_dequeuer);
584 return 0;
585 }
This page took 0.039402 seconds and 3 git commands to generate.