LFQ: Fix unpaired lock/unlock
[urcu.git] / tests / test_qsbr_lfq.c
1 /*
2 * test_urcu.c
3 *
4 * Userspace RCU library - example RCU-based lock-free queue
5 *
6 * Copyright February 2010 - Paolo Bonzini <pbonzinI@redhat.com>
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #define _GNU_SOURCE
24 #include "../config.h"
25 #include <stdio.h>
26 #include <pthread.h>
27 #include <stdlib.h>
28 #include <stdint.h>
29 #include <stdbool.h>
30 #include <string.h>
31 #include <sys/types.h>
32 #include <sys/wait.h>
33 #include <unistd.h>
34 #include <stdio.h>
35 #include <assert.h>
36 #include <sys/syscall.h>
37 #include <sched.h>
38 #include <errno.h>
39
40 #include <urcu/arch.h>
41
42 /* hardcoded number of CPUs */
43 #define NR_CPUS 16384
44
45 #if defined(_syscall0)
46 _syscall0(pid_t, gettid)
47 #elif defined(__NR_gettid)
48 static inline pid_t gettid(void)
49 {
50 return syscall(__NR_gettid);
51 }
52 #else
53 #warning "use pid as tid"
54 static inline pid_t gettid(void)
55 {
56 return getpid();
57 }
58 #endif
59
60 #ifndef DYNAMIC_LINK_TEST
61 #define _LGPL_SOURCE
62 #endif
63 #include "urcu-qsbr.h"
64
65
66 static volatile int test_go, test_stop;
67
68 static unsigned long rduration;
69
70 static unsigned long duration;
71
72 /* read-side C.S. duration, in loops */
73 static unsigned long wdelay;
74
75 static inline void loop_sleep(unsigned long l)
76 {
77 while(l-- != 0)
78 cpu_relax();
79 }
80
81 static int verbose_mode;
82
83 #define printf_verbose(fmt, args...) \
84 do { \
85 if (verbose_mode) \
86 printf(fmt, args); \
87 } while (0)
88
89 static unsigned int cpu_affinities[NR_CPUS];
90 static unsigned int next_aff = 0;
91 static int use_affinity = 0;
92
93 pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER;
94
95 #ifndef HAVE_CPU_SET_T
96 typedef unsigned long cpu_set_t;
97 # define CPU_ZERO(cpuset) do { *(cpuset) = 0; } while(0)
98 # define CPU_SET(cpu, cpuset) do { *(cpuset) |= (1UL << (cpu)); } while(0)
99 #endif
100
101 static void set_affinity(void)
102 {
103 cpu_set_t mask;
104 int cpu;
105 int ret;
106
107 if (!use_affinity)
108 return;
109
110 #if HAVE_SCHED_SETAFFINITY
111 ret = pthread_mutex_lock(&affinity_mutex);
112 if (ret) {
113 perror("Error in pthread mutex lock");
114 exit(-1);
115 }
116 cpu = cpu_affinities[next_aff++];
117 ret = pthread_mutex_unlock(&affinity_mutex);
118 if (ret) {
119 perror("Error in pthread mutex unlock");
120 exit(-1);
121 }
122
123 CPU_ZERO(&mask);
124 CPU_SET(cpu, &mask);
125 #if SCHED_SETAFFINITY_ARGS == 2
126 sched_setaffinity(0, &mask);
127 #else
128 sched_setaffinity(0, sizeof(mask), &mask);
129 #endif
130 #endif /* HAVE_SCHED_SETAFFINITY */
131 }
132
133 /*
134 * returns 0 if test should end.
135 */
136 static int test_duration_dequeue(void)
137 {
138 return !test_stop;
139 }
140
141 static int test_duration_enqueue(void)
142 {
143 return !test_stop;
144 }
145
146 static unsigned long long __thread nr_dequeues;
147 static unsigned long long __thread nr_enqueues;
148
149 static unsigned long nr_successful_dequeues;
150 static unsigned int nr_enqueuers;
151 static unsigned int nr_dequeuers;
152
153
154 #define ARRAY_POISON 0xDEADBEEF
155 #define PAGE_SIZE 4096
156 #define PAGE_MASK (PAGE_SIZE - 1)
157 #define NODES_PER_PAGE (PAGE_SIZE - 16) / sizeof (struct node)
158
159 /* Lock-free queue, using the RCU to avoid the ABA problem and (more
160 interestingly) to efficiently handle freeing memory.
161
162 We have to protect both the enqueuer and dequeuer's compare-and-
163 exchange operation from running across a free and a subsequent
164 reallocation of the same memory. So, we protect the free with
165 synchronize_rcu; this is enough because all the allocations take
166 place before the compare-and-exchange ops.
167
168 Besides adding rcu_read_{,un}lock, the enqueue/dequeue are a standard
169 implementation of a lock-free-queue. The first node in the queue is
170 always dummy: dequeuing returns the data from HEAD->NEXT, advances
171 HEAD to HEAD->NEXT (which will now serve as dummy node), and frees the
172 old HEAD. Since RCU avoids the ABA problem, it doesn't use double-word
173 compare-and-exchange operations. Node allocation and deallocation are
174 a "black box" and synchronize_rcu is hidden within node deallocation.
175
176 So, the tricky part is finding a good allocation strategy for nodes.
177 The allocator for nodes is shared by multiple threads, and since
178 malloc/free are not lock-free a layer above them is obviously
179 necessary: otherwise the whole exercise is useless. In addition,
180 to avoid penalizing dequeues, the allocator should avoid frequent
181 synchronization (because synchronize_rcu is expensive).
182
183 The scheme that is used here uses a page as the allocation
184 unit for nodes. A page is freed when no more nodes are in use.
185 Nodes from a page are never reused.
186
187 The nodes are allocated from Q->CURRENT. Since whoever finds a full
188 page has to busy wait, we use a trick to limit the duration of busy
189 waiting. A free page Q->FREE is always kept ready, so that any thread
190 that allocates the last node in a page, or finds a full page can try
191 to update Q->CURRENT. Whoever loses the race has to busy wait, OTOH
192 whoever wins the race has to allocate the new Q->FREE. In other words,
193 if the following sequence happens
194
195 Thread 1 Thread 2 other threads
196 -----------------------------------------------------------------------
197 Get last node from page
198 q->current = q->free;
199 fill up q->current
200 q->current = q->free fails
201
202 then thread 1 does not have anymore the duty of allocating q->current;
203 thread 2 will do that. If a thread finds a full current page and
204 Q->CURRENT == Q->FREE, this means that another thread is going to
205 allocate Q->FREE soon, and it busy waits. After the allocation
206 finishes, everything proceeds normally: some thread will take care
207 of setting Q->CURRENT and allocating a new Q->FREE.
208
209 One common scheme for allocation is to use a free list (implemented
210 as a lock-free stack), but this free list is potentially unbounded.
211 Instead, with the above scheme the number of live pages at any time
212 is equal to the number of enqueuing threads. */
213
214 struct node {
215 void *data;
216 void *next;
217 };
218
219 struct node_page {
220 int in;
221 int out;
222 char padding[16 - sizeof(int) * 2];
223 struct node nodes[NODES_PER_PAGE];
224 };
225
226
227 struct queue {
228 struct node_page *current, *free;
229 struct node *head, *tail;
230 };
231
232 static struct node_page *new_node_page()
233 {
234 struct node_page *p = valloc (PAGE_SIZE);
235 rcu_quiescent_state();
236 p->in = p->out = 0;
237 return p;
238 }
239
240 static void free_node_page(struct node_page *p)
241 {
242 /* Help making sure that accessing a dangling pointer is
243 adequately punished. */
244 p->in = ARRAY_POISON;
245 free (p);
246 }
247
248 static struct node *new_node(struct queue *q)
249 {
250 struct node *n;
251 struct node_page *p;
252 int i;
253
254 do {
255 p = q->current;
256 i = p->in;
257 if (i >= NODES_PER_PAGE - 1 &&
258 q->free != p &&
259 uatomic_cmpxchg(&q->current, p, q->free) == p)
260 q->free = new_node_page();
261
262 } while (i == NODES_PER_PAGE || uatomic_cmpxchg(&p->in, i, i+1) != i);
263
264 assert (i >= 0 && i < NODES_PER_PAGE);
265 n = &p->nodes[i];
266 n->next = NULL;
267 return n;
268 }
269
270 void free_node(struct node *n)
271 {
272 struct node_page *p = (struct node_page *) ((intptr_t) n & ~PAGE_MASK);
273
274 if (uatomic_add_return(&p->out, 1) == NODES_PER_PAGE) {
275 rcu_quiescent_state();
276 synchronize_rcu();
277 free_node_page(p);
278 }
279 }
280
281 void init_queue(struct queue *q)
282 {
283 q->current = new_node_page();
284 q->free = new_node_page();
285 q->head = q->tail = new_node(q);
286 }
287
288 void enqueue(struct queue *q, void *value)
289 {
290 struct node *n = new_node(q);
291 n->data = value;
292 rcu_read_lock();
293 for (;;) {
294 struct node *tail = rcu_dereference(q->tail);
295 struct node *next = rcu_dereference(tail->next);
296 if (tail != q->tail) {
297 /* A change occurred while reading the values. */
298 continue;
299 }
300
301 if (next) {
302 /* Help moving tail further. */
303 uatomic_cmpxchg(&q->tail, tail, next);
304 continue;
305 }
306
307 if (uatomic_cmpxchg(&tail->next, NULL, n) == NULL) {
308 /* Move tail (another operation might beat us to it,
309 that's fine). */
310 uatomic_cmpxchg(&q->tail, tail, n);
311 rcu_read_unlock();
312 return;
313 }
314 }
315 }
316
317 void *dequeue(struct queue *q, bool *not_empty)
318 {
319 bool dummy;
320 if (!not_empty)
321 not_empty = &dummy;
322
323 rcu_read_lock();
324 *not_empty = false;
325 for (;;) {
326 void *data;
327 struct node *head = rcu_dereference(q->head);
328 struct node *tail = rcu_dereference(q->tail);
329 struct node *next = rcu_dereference(head->next);
330
331 if (head != q->head) {
332 /* A change occurred while reading the values. */
333 continue;
334 }
335
336 if (head == tail) {
337 /* If all three are consistent, the queue is empty. */
338 if (!next)
339 return NULL;
340
341 /* Help moving tail further. */
342 uatomic_cmpxchg(&q->tail, tail, next);
343 continue;
344 }
345
346 data = next->data;
347 if (uatomic_cmpxchg(&q->head, head, next) == head) {
348 /* Next remains as a dummy node, head is freed. */
349 rcu_read_unlock();
350 *not_empty = true;
351 free_node (head);
352 return data;
353 }
354 }
355 }
356
357 \f
358 static struct queue q;
359
360 void *thr_enqueuer(void *_count)
361 {
362 unsigned long long *count = _count;
363
364 printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
365 "enqueuer", pthread_self(), (unsigned long)gettid());
366
367 set_affinity();
368
369 rcu_register_thread();
370
371 while (!test_go)
372 {
373 }
374 smp_mb();
375
376 for (;;) {
377 enqueue (&q, NULL);
378
379 if (unlikely(wdelay))
380 loop_sleep(wdelay);
381 nr_enqueues++;
382 if (unlikely(!test_duration_enqueue()))
383 break;
384 }
385
386 rcu_unregister_thread();
387
388 *count = nr_enqueues;
389 printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
390 "enqueuer", pthread_self(), (unsigned long)gettid(), nr_enqueues);
391 return ((void*)1);
392
393 }
394
395 void *thr_dequeuer(void *_count)
396 {
397 unsigned long long *count = _count;
398
399 printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
400 "dequeuer", pthread_self(), (unsigned long)gettid());
401
402 set_affinity();
403
404 rcu_register_thread();
405
406 while (!test_go)
407 {
408 }
409 smp_mb();
410
411 for (;;) {
412 bool not_empty;
413 dequeue (&q, &not_empty);
414 if (not_empty)
415 uatomic_inc (&nr_successful_dequeues);
416
417 nr_dequeues++;
418 if (unlikely(!test_duration_dequeue()))
419 break;
420 if (unlikely(rduration))
421 loop_sleep(rduration);
422 }
423
424 rcu_unregister_thread();
425
426 printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
427 "dequeuer", pthread_self(), (unsigned long)gettid(), nr_dequeues);
428 *count = nr_dequeues;
429 return ((void*)2);
430 }
431
432 void test_end(struct queue *q)
433 {
434 bool not_empty;
435 do
436 dequeue (q, &not_empty);
437 while (!not_empty);
438 if (q->current != q->free)
439 free_node_page(q->free);
440 free_node_page(q->current);
441 }
442
443 void show_usage(int argc, char **argv)
444 {
445 printf("Usage : %s nr_dequeuers nr_enqueuers duration (s)", argv[0]);
446 printf(" [-d delay] (enqueuer period (us))");
447 printf(" [-c duration] (dequeuer C.S. duration (in loops))");
448 printf(" [-v] (verbose output)");
449 printf(" [-a cpu#] [-a cpu#]... (affinity)");
450 printf("\n");
451 }
452
453 int main(int argc, char **argv)
454 {
455 int err;
456 pthread_t *tid_enqueuer, *tid_dequeuer;
457 void *tret;
458 unsigned long long *count_enqueuer, *count_dequeuer;
459 unsigned long long tot_enqueues = 0, tot_dequeues = 0;
460 int i, a;
461
462 if (argc < 4) {
463 show_usage(argc, argv);
464 return -1;
465 }
466
467 err = sscanf(argv[1], "%u", &nr_dequeuers);
468 if (err != 1) {
469 show_usage(argc, argv);
470 return -1;
471 }
472
473 err = sscanf(argv[2], "%u", &nr_enqueuers);
474 if (err != 1) {
475 show_usage(argc, argv);
476 return -1;
477 }
478
479 err = sscanf(argv[3], "%lu", &duration);
480 if (err != 1) {
481 show_usage(argc, argv);
482 return -1;
483 }
484
485 for (i = 4; i < argc; i++) {
486 if (argv[i][0] != '-')
487 continue;
488 switch (argv[i][1]) {
489 case 'a':
490 if (argc < i + 2) {
491 show_usage(argc, argv);
492 return -1;
493 }
494 a = atoi(argv[++i]);
495 cpu_affinities[next_aff++] = a;
496 use_affinity = 1;
497 printf_verbose("Adding CPU %d affinity\n", a);
498 break;
499 case 'c':
500 if (argc < i + 2) {
501 show_usage(argc, argv);
502 return -1;
503 }
504 rduration = atol(argv[++i]);
505 break;
506 case 'd':
507 if (argc < i + 2) {
508 show_usage(argc, argv);
509 return -1;
510 }
511 wdelay = atol(argv[++i]);
512 break;
513 case 'v':
514 verbose_mode = 1;
515 break;
516 }
517 }
518
519 printf_verbose("running test for %lu seconds, %u enqueuers, %u dequeuers.\n",
520 duration, nr_enqueuers, nr_dequeuers);
521 printf_verbose("Writer delay : %lu loops.\n", rduration);
522 printf_verbose("Reader duration : %lu loops.\n", wdelay);
523 printf_verbose("thread %-6s, thread id : %lx, tid %lu\n",
524 "main", pthread_self(), (unsigned long)gettid());
525
526 tid_enqueuer = malloc(sizeof(*tid_enqueuer) * nr_enqueuers);
527 tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
528 count_enqueuer = malloc(sizeof(*count_enqueuer) * nr_enqueuers);
529 count_dequeuer = malloc(sizeof(*count_dequeuer) * nr_dequeuers);
530 init_queue (&q);
531
532 next_aff = 0;
533
534 for (i = 0; i < nr_enqueuers; i++) {
535 err = pthread_create(&tid_enqueuer[i], NULL, thr_enqueuer,
536 &count_enqueuer[i]);
537 if (err != 0)
538 exit(1);
539 }
540 for (i = 0; i < nr_dequeuers; i++) {
541 err = pthread_create(&tid_dequeuer[i], NULL, thr_dequeuer,
542 &count_dequeuer[i]);
543 if (err != 0)
544 exit(1);
545 }
546
547 smp_mb();
548
549 test_go = 1;
550
551 for (i = 0; i < duration; i++) {
552 sleep(1);
553 if (verbose_mode)
554 write (1, ".", 1);
555 }
556
557 test_stop = 1;
558
559 for (i = 0; i < nr_enqueuers; i++) {
560 err = pthread_join(tid_enqueuer[i], &tret);
561 if (err != 0)
562 exit(1);
563 tot_enqueues += count_enqueuer[i];
564 }
565 for (i = 0; i < nr_dequeuers; i++) {
566 err = pthread_join(tid_dequeuer[i], &tret);
567 if (err != 0)
568 exit(1);
569 tot_dequeues += count_dequeuer[i];
570 }
571
572 printf_verbose("total number of enqueues : %llu, dequeues %llu\n", tot_enqueues,
573 tot_dequeues);
574 printf("SUMMARY %-25s testdur %4lu nr_enqueuers %3u wdelay %6lu "
575 "nr_dequeuers %3u "
576 "rdur %6lu nr_enqueues %12llu nr_dequeues %12llu successful %12lu nr_ops %12llu\n",
577 argv[0], duration, nr_enqueuers, wdelay,
578 nr_dequeuers, rduration, tot_enqueues, tot_dequeues,
579 nr_successful_dequeues, tot_enqueues + tot_dequeues);
580
581 test_end(&q);
582 free(tid_enqueuer);
583 free(tid_dequeuer);
584 free(count_enqueuer);
585 free(count_dequeuer);
586 return 0;
587 }
This page took 0.042602 seconds and 4 git commands to generate.