4 * Userspace RCU library - example RCU-based lock-free queue
6 * Copyright February 2010 - Paolo Bonzini <pbonzinI@redhat.com>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 #include "../config.h"
31 #include <sys/types.h>
36 #include <sys/syscall.h>
40 #include <urcu/arch.h>
42 /* hardcoded number of CPUs */
45 #if defined(_syscall0)
46 _syscall0(pid_t
, gettid
)
47 #elif defined(__NR_gettid)
48 static inline pid_t
gettid(void)
50 return syscall(__NR_gettid
);
53 #warning "use pid as tid"
54 static inline pid_t
gettid(void)
60 #ifndef DYNAMIC_LINK_TEST
66 static volatile int test_go
, test_stop
;
68 static unsigned long rduration
;
70 static unsigned long duration
;
72 /* read-side C.S. duration, in loops */
73 static unsigned long wdelay
;
75 static inline void loop_sleep(unsigned long l
)
81 static int verbose_mode
;
83 #define printf_verbose(fmt, args...) \
89 static unsigned int cpu_affinities
[NR_CPUS
];
90 static unsigned int next_aff
= 0;
91 static int use_affinity
= 0;
93 pthread_mutex_t affinity_mutex
= PTHREAD_MUTEX_INITIALIZER
;
95 #ifndef HAVE_CPU_SET_T
96 typedef unsigned long cpu_set_t
;
97 # define CPU_ZERO(cpuset) do { *(cpuset) = 0; } while(0)
98 # define CPU_SET(cpu, cpuset) do { *(cpuset) |= (1UL << (cpu)); } while(0)
101 static void set_affinity(void)
110 #if HAVE_SCHED_SETAFFINITY
111 ret
= pthread_mutex_lock(&affinity_mutex
);
113 perror("Error in pthread mutex lock");
116 cpu
= cpu_affinities
[next_aff
++];
117 ret
= pthread_mutex_unlock(&affinity_mutex
);
119 perror("Error in pthread mutex unlock");
125 #if SCHED_SETAFFINITY_ARGS == 2
126 sched_setaffinity(0, &mask
);
128 sched_setaffinity(0, sizeof(mask
), &mask
);
130 #endif /* HAVE_SCHED_SETAFFINITY */
134 * returns 0 if test should end.
136 static int test_duration_dequeue(void)
141 static int test_duration_enqueue(void)
146 static unsigned long long __thread nr_dequeues
;
147 static unsigned long long __thread nr_enqueues
;
149 static unsigned long nr_successful_dequeues
;
150 static unsigned int nr_enqueuers
;
151 static unsigned int nr_dequeuers
;
154 #define ARRAY_POISON 0xDEADBEEF
155 #define PAGE_SIZE 4096
156 #define PAGE_MASK (PAGE_SIZE - 1)
157 #define NODES_PER_PAGE (PAGE_SIZE - 16) / sizeof (struct node)
159 /* Lock-free queue, using the RCU to avoid the ABA problem and (more
160 interestingly) to efficiently handle freeing memory.
162 We have to protect both the enqueuer and dequeuer's compare-and-
163 exchange operation from running across a free and a subsequent
164 reallocation of the same memory. So, we protect the free with
165 synchronize_rcu; this is enough because all the allocations take
166 place before the compare-and-exchange ops.
168 Besides adding rcu_read_{,un}lock, the enqueue/dequeue are a standard
169 implementation of a lock-free-queue. The first node in the queue is
170 always dummy: dequeuing returns the data from HEAD->NEXT, advances
171 HEAD to HEAD->NEXT (which will now serve as dummy node), and frees the
172 old HEAD. Since RCU avoids the ABA problem, it doesn't use double-word
173 compare-and-exchange operations. Node allocation and deallocation are
174 a "black box" and synchronize_rcu is hidden within node deallocation.
176 So, the tricky part is finding a good allocation strategy for nodes.
177 The allocator for nodes is shared by multiple threads, and since
178 malloc/free are not lock-free a layer above them is obviously
179 necessary: otherwise the whole exercise is useless. In addition,
180 to avoid penalizing dequeues, the allocator should avoid frequent
181 synchronization (because synchronize_rcu is expensive).
183 The scheme that is used here uses a page as the allocation
184 unit for nodes. A page is freed when no more nodes are in use.
185 Nodes from a page are never reused.
187 The nodes are allocated from Q->CURRENT. Since whoever finds a full
188 page has to busy wait, we use a trick to limit the duration of busy
189 waiting. A free page Q->FREE is always kept ready, so that any thread
190 that allocates the last node in a page, or finds a full page can try
191 to update Q->CURRENT. Whoever loses the race has to busy wait, OTOH
192 whoever wins the race has to allocate the new Q->FREE. In other words,
193 if the following sequence happens
195 Thread 1 Thread 2 other threads
196 -----------------------------------------------------------------------
197 Get last node from page
198 q->current = q->free;
200 q->current = q->free fails
202 then thread 1 does not have anymore the duty of allocating q->current;
203 thread 2 will do that. If a thread finds a full current page and
204 Q->CURRENT == Q->FREE, this means that another thread is going to
205 allocate Q->FREE soon, and it busy waits. After the allocation
206 finishes, everything proceeds normally: some thread will take care
207 of setting Q->CURRENT and allocating a new Q->FREE.
209 One common scheme for allocation is to use a free list (implemented
210 as a lock-free stack), but this free list is potentially unbounded.
211 Instead, with the above scheme the number of live pages at any time
212 is equal to the number of enqueuing threads. */
222 char padding
[16 - sizeof(int) * 2];
223 struct node nodes
[NODES_PER_PAGE
];
228 struct node_page
*current
, *free
;
229 struct node
*head
, *tail
;
232 static struct node_page
*new_node_page()
234 struct node_page
*p
= valloc (PAGE_SIZE
);
239 static void free_node_page(struct node_page
*p
)
241 /* Help making sure that accessing a dangling pointer is
242 adequately punished. */
243 p
->in
= ARRAY_POISON
;
247 static struct node
*new_node(struct queue
*q
)
256 if (i
>= NODES_PER_PAGE
- 1 &&
258 uatomic_cmpxchg(&q
->current
, p
, q
->free
) == p
)
259 q
->free
= new_node_page();
261 } while (i
== NODES_PER_PAGE
|| uatomic_cmpxchg(&p
->in
, i
, i
+1) != i
);
263 assert (i
>= 0 && i
< NODES_PER_PAGE
);
269 void free_node(struct node
*n
)
271 struct node_page
*p
= (struct node_page
*) ((intptr_t) n
& ~PAGE_MASK
);
273 if (uatomic_add_return(&p
->out
, 1) == NODES_PER_PAGE
) {
279 void init_queue(struct queue
*q
)
281 q
->current
= new_node_page();
282 q
->free
= new_node_page();
283 q
->head
= q
->tail
= new_node(q
);
286 void enqueue(struct queue
*q
, void *value
)
288 struct node
*n
= new_node(q
);
292 struct node
*tail
= rcu_dereference(q
->tail
);
293 struct node
*next
= rcu_dereference(tail
->next
);
294 if (tail
!= q
->tail
) {
295 /* A change occurred while reading the values. */
300 /* Help moving tail further. */
301 uatomic_cmpxchg(&q
->tail
, tail
, next
);
305 if (uatomic_cmpxchg(&tail
->next
, NULL
, n
) == NULL
) {
306 /* Move tail (another operation might beat us to it,
308 uatomic_cmpxchg(&q
->tail
, tail
, n
);
315 void *dequeue(struct queue
*q
, bool *not_empty
)
325 struct node
*head
= rcu_dereference(q
->head
);
326 struct node
*tail
= rcu_dereference(q
->tail
);
327 struct node
*next
= rcu_dereference(head
->next
);
329 if (head
!= q
->head
) {
330 /* A change occurred while reading the values. */
335 /* If all three are consistent, the queue is empty. */
341 /* Help moving tail further. */
342 uatomic_cmpxchg(&q
->tail
, tail
, next
);
347 if (uatomic_cmpxchg(&q
->head
, head
, next
) == head
) {
348 /* Next remains as a dummy node, head is freed. */
358 static struct queue q
;
360 void *thr_enqueuer(void *_count
)
362 unsigned long long *count
= _count
;
364 printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
365 "enqueuer", pthread_self(), (unsigned long)gettid());
369 rcu_register_thread();
379 if (unlikely(wdelay
))
382 if (unlikely(!test_duration_enqueue()))
386 rcu_unregister_thread();
388 *count
= nr_enqueues
;
389 printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
390 "enqueuer", pthread_self(), (unsigned long)gettid(), nr_enqueues
);
395 void *thr_dequeuer(void *_count
)
397 unsigned long long *count
= _count
;
399 printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
400 "dequeuer", pthread_self(), (unsigned long)gettid());
404 rcu_register_thread();
413 dequeue (&q
, ¬_empty
);
415 uatomic_inc (&nr_successful_dequeues
);
418 if (unlikely(!test_duration_dequeue()))
420 if (unlikely(rduration
))
421 loop_sleep(rduration
);
424 rcu_unregister_thread();
426 printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
427 "dequeuer", pthread_self(), (unsigned long)gettid(), nr_dequeues
);
428 *count
= nr_dequeues
;
432 void test_end(struct queue
*q
)
436 dequeue (q
, ¬_empty
);
438 if (q
->current
!= q
->free
)
439 free_node_page(q
->free
);
440 free_node_page(q
->current
);
443 void show_usage(int argc
, char **argv
)
445 printf("Usage : %s nr_dequeuers nr_enqueuers duration (s)", argv
[0]);
446 printf(" [-d delay] (enqueuer period (in loops))");
447 printf(" [-c duration] (dequeuer period (in loops))");
448 printf(" [-v] (verbose output)");
449 printf(" [-a cpu#] [-a cpu#]... (affinity)");
453 int main(int argc
, char **argv
)
456 pthread_t
*tid_enqueuer
, *tid_dequeuer
;
458 unsigned long long *count_enqueuer
, *count_dequeuer
;
459 unsigned long long tot_enqueues
= 0, tot_dequeues
= 0;
463 show_usage(argc
, argv
);
467 err
= sscanf(argv
[1], "%u", &nr_dequeuers
);
469 show_usage(argc
, argv
);
473 err
= sscanf(argv
[2], "%u", &nr_enqueuers
);
475 show_usage(argc
, argv
);
479 err
= sscanf(argv
[3], "%lu", &duration
);
481 show_usage(argc
, argv
);
485 for (i
= 4; i
< argc
; i
++) {
486 if (argv
[i
][0] != '-')
488 switch (argv
[i
][1]) {
491 show_usage(argc
, argv
);
495 cpu_affinities
[next_aff
++] = a
;
497 printf_verbose("Adding CPU %d affinity\n", a
);
501 show_usage(argc
, argv
);
504 rduration
= atol(argv
[++i
]);
508 show_usage(argc
, argv
);
511 wdelay
= atol(argv
[++i
]);
519 printf_verbose("running test for %lu seconds, %u enqueuers, %u dequeuers.\n",
520 duration
, nr_enqueuers
, nr_dequeuers
);
521 printf_verbose("Writer delay : %lu loops.\n", rduration
);
522 printf_verbose("Reader duration : %lu loops.\n", wdelay
);
523 printf_verbose("thread %-6s, thread id : %lx, tid %lu\n",
524 "main", pthread_self(), (unsigned long)gettid());
526 tid_enqueuer
= malloc(sizeof(*tid_enqueuer
) * nr_enqueuers
);
527 tid_dequeuer
= malloc(sizeof(*tid_dequeuer
) * nr_dequeuers
);
528 count_enqueuer
= malloc(sizeof(*count_enqueuer
) * nr_enqueuers
);
529 count_dequeuer
= malloc(sizeof(*count_dequeuer
) * nr_dequeuers
);
534 for (i
= 0; i
< nr_enqueuers
; i
++) {
535 err
= pthread_create(&tid_enqueuer
[i
], NULL
, thr_enqueuer
,
540 for (i
= 0; i
< nr_dequeuers
; i
++) {
541 err
= pthread_create(&tid_dequeuer
[i
], NULL
, thr_dequeuer
,
551 for (i
= 0; i
< duration
; i
++) {
559 for (i
= 0; i
< nr_enqueuers
; i
++) {
560 err
= pthread_join(tid_enqueuer
[i
], &tret
);
563 tot_enqueues
+= count_enqueuer
[i
];
565 for (i
= 0; i
< nr_dequeuers
; i
++) {
566 err
= pthread_join(tid_dequeuer
[i
], &tret
);
569 tot_dequeues
+= count_dequeuer
[i
];
572 printf_verbose("total number of enqueues : %llu, dequeues %llu\n", tot_enqueues
,
574 printf("SUMMARY %-25s testdur %4lu nr_enqueuers %3u wdelay %6lu "
576 "rdur %6lu nr_enqueues %12llu nr_dequeues %12llu successful %12lu nr_ops %12llu\n",
577 argv
[0], duration
, nr_enqueuers
, wdelay
,
578 nr_dequeuers
, rduration
, tot_enqueues
, tot_dequeues
,
579 nr_successful_dequeues
, tot_enqueues
+ tot_dequeues
);
584 free(count_enqueuer
);
585 free(count_dequeuer
);