Document the new call_rcu() primitives.
[urcu.git] / urcu-call-rcu-impl.h
CommitLineData
b57aee66
PM
1/*
2 * urcu-call-rcu.c
3 *
4 * Userspace RCU library - batch memory reclamation with kernel API
5 *
6 * Copyright (c) 2010 Paul E. McKenney <paulmck@linux.vnet.ibm.com>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23#include <stdio.h>
24#include <pthread.h>
25#include <signal.h>
26#include <assert.h>
27#include <stdlib.h>
28#include <string.h>
29#include <errno.h>
30#include <poll.h>
31#include <sys/time.h>
32#include <syscall.h>
33#include <unistd.h>
34
35#include "config.h"
36#include "urcu/wfqueue.h"
37#include "urcu-call-rcu.h"
38#include "urcu-pointer.h"
3c24913f 39#include "urcu/list.h"
b57aee66
PM
40
41/* Data structure that identifies a call_rcu thread. */
42
43struct call_rcu_data {
44 struct cds_wfq_queue cbs;
45 unsigned long flags;
46 pthread_mutex_t mtx;
47 pthread_cond_t cond;
48 unsigned long qlen;
49 pthread_t tid;
3c24913f 50 struct cds_list_head list;
b57aee66
PM
51} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
52
3c24913f
PM
53/*
54 * List of all call_rcu_data structures to keep valgrind happy.
55 * Protected by call_rcu_mutex.
56 */
57
58CDS_LIST_HEAD(call_rcu_data_list);
59
b57aee66
PM
60/* Link a thread using call_rcu() to its call_rcu thread. */
61
62static __thread struct call_rcu_data *thread_call_rcu_data;
63
64/* Guard call_rcu thread creation. */
65
66static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER;
67
68/* If a given thread does not have its own call_rcu thread, this is default. */
69
70static struct call_rcu_data *default_call_rcu_data;
71
b57aee66
PM
72/*
73 * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are
74 * available, then we can have call_rcu threads assigned to individual
75 * CPUs rather than only to specific threads.
76 */
77
78#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF)
79
80/*
81 * Pointer to array of pointers to per-CPU call_rcu_data structures
82 * and # CPUs.
83 */
84
85static struct call_rcu_data **per_cpu_call_rcu_data;
86static long maxcpus;
87
88/* Allocate the array if it has not already been allocated. */
89
90static void alloc_cpu_call_rcu_data(void)
91{
92 struct call_rcu_data **p;
93 static int warned = 0;
94
95 if (maxcpus != 0)
96 return;
97 maxcpus = sysconf(_SC_NPROCESSORS_CONF);
98 if (maxcpus <= 0) {
99 return;
100 }
101 p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data));
102 if (p != NULL) {
103 memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data));
104 per_cpu_call_rcu_data = p;
105 } else {
106 if (!warned) {
107 fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n");
108 }
109 warned = 1;
110 }
111}
112
113#else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
114
115static const struct call_rcu_data **per_cpu_call_rcu_data = NULL;
116static const long maxcpus = -1;
117
118static void alloc_cpu_call_rcu_data(void)
119{
120}
121
122static int sched_getcpu(void)
123{
124 return -1;
125}
126
127#endif /* #else #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
128
129/* Acquire the specified pthread mutex. */
130
131static void call_rcu_lock(pthread_mutex_t *pmp)
132{
133 if (pthread_mutex_lock(pmp) != 0) {
134 perror("pthread_mutex_lock");
135 exit(-1);
136 }
137}
138
139/* Release the specified pthread mutex. */
140
141static void call_rcu_unlock(pthread_mutex_t *pmp)
142{
143 if (pthread_mutex_unlock(pmp) != 0) {
144 perror("pthread_mutex_unlock");
145 exit(-1);
146 }
147}
148
149/* This is the code run by each call_rcu thread. */
150
151static void *call_rcu_thread(void *arg)
152{
153 unsigned long cbcount;
154 struct cds_wfq_node *cbs;
155 struct cds_wfq_node **cbs_tail;
156 struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
157 struct rcu_head *rhp;
158
159 thread_call_rcu_data = crdp;
160 for (;;) {
161 if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
162 while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
163 poll(NULL, 0, 1);
164 _CMM_STORE_SHARED(crdp->cbs.head, NULL);
165 cbs_tail = (struct cds_wfq_node **)
166 uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
167 synchronize_rcu();
168 cbcount = 0;
169 do {
170 while (cbs->next == NULL &&
171 &cbs->next != cbs_tail)
172 poll(NULL, 0, 1);
173 if (cbs == &crdp->cbs.dummy) {
174 cbs = cbs->next;
175 continue;
176 }
177 rhp = (struct rcu_head *)cbs;
178 cbs = cbs->next;
179 rhp->func(rhp);
180 cbcount++;
181 } while (cbs != NULL);
182 uatomic_sub(&crdp->qlen, cbcount);
183 }
7106ddf8
PM
184 if (crdp->flags & URCU_CALL_RCU_STOP)
185 break;
b57aee66
PM
186 if (crdp->flags & URCU_CALL_RCU_RT)
187 poll(NULL, 0, 10);
188 else {
189 call_rcu_lock(&crdp->mtx);
190 _CMM_STORE_SHARED(crdp->flags,
191 crdp->flags & ~URCU_CALL_RCU_RUNNING);
192 if (&crdp->cbs.head ==
193 _CMM_LOAD_SHARED(crdp->cbs.tail) &&
194 pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) {
195 perror("pthread_cond_wait");
196 exit(-1);
197 }
198 _CMM_STORE_SHARED(crdp->flags,
199 crdp->flags | URCU_CALL_RCU_RUNNING);
200 poll(NULL, 0, 10);
201 call_rcu_unlock(&crdp->mtx);
202 }
203 }
7106ddf8
PM
204 call_rcu_lock(&crdp->mtx);
205 crdp->flags |= URCU_CALL_RCU_STOPPED;
206 call_rcu_unlock(&crdp->mtx);
207 return NULL;
b57aee66
PM
208}
209
210/*
211 * Create both a call_rcu thread and the corresponding call_rcu_data
3c24913f
PM
212 * structure, linking the structure in as specified. Caller must hold
213 * call_rcu_mutex.
b57aee66
PM
214 */
215
3c24913f
PM
216static void call_rcu_data_init(struct call_rcu_data **crdpp,
217 unsigned long flags)
b57aee66
PM
218{
219 struct call_rcu_data *crdp;
220
221 crdp = malloc(sizeof(*crdp));
222 if (crdp == NULL) {
223 fprintf(stderr, "Out of memory.\n");
224 exit(-1);
225 }
226 memset(crdp, '\0', sizeof(*crdp));
227 cds_wfq_init(&crdp->cbs);
228 crdp->qlen = 0;
229 if (pthread_mutex_init(&crdp->mtx, NULL) != 0) {
230 perror("pthread_mutex_init");
231 exit(-1);
232 }
233 if (pthread_cond_init(&crdp->cond, NULL) != 0) {
234 perror("pthread_cond_init");
235 exit(-1);
236 }
237 crdp->flags = flags | URCU_CALL_RCU_RUNNING;
3c24913f 238 cds_list_add(&crdp->list, &call_rcu_data_list);
b57aee66
PM
239 cmm_smp_mb(); /* Structure initialized before pointer is planted. */
240 *crdpp = crdp;
241 if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) {
242 perror("pthread_create");
243 exit(-1);
244 }
245}
246
247/*
248 * Return a pointer to the call_rcu_data structure for the specified
249 * CPU, returning NULL if there is none. We cannot automatically
250 * created it because the platform we are running on might not define
251 * sched_getcpu().
252 */
253
254struct call_rcu_data *get_cpu_call_rcu_data(int cpu)
255{
256 static int warned = 0;
257
258 if (per_cpu_call_rcu_data == NULL)
259 return NULL;
260 if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) {
261 fprintf(stderr, "[error] liburcu: get CPU # out of range\n");
262 warned = 1;
263 }
264 if (cpu < 0 || maxcpus <= cpu)
265 return NULL;
266 return per_cpu_call_rcu_data[cpu];
267}
268
269/*
270 * Return the tid corresponding to the call_rcu thread whose
271 * call_rcu_data structure is specified.
272 */
273
274pthread_t get_call_rcu_thread(struct call_rcu_data *crdp)
275{
276 return crdp->tid;
277}
278
279/*
280 * Create a call_rcu_data structure (with thread) and return a pointer.
281 */
282
3c24913f 283static struct call_rcu_data *__create_call_rcu_data(unsigned long flags)
b57aee66
PM
284{
285 struct call_rcu_data *crdp;
286
287 call_rcu_data_init(&crdp, flags);
288 return crdp;
289}
290
3c24913f
PM
291struct call_rcu_data *create_call_rcu_data(unsigned long flags)
292{
293 struct call_rcu_data *crdp;
294
295 call_rcu_lock(&call_rcu_mutex);
296 crdp = __create_call_rcu_data(flags);
297 call_rcu_unlock(&call_rcu_mutex);
298 return crdp;
299}
300
b57aee66
PM
301/*
302 * Set the specified CPU to use the specified call_rcu_data structure.
7106ddf8
PM
303 *
304 * Use NULL to remove a CPU's call_rcu_data structure, but it is
305 * the caller's responsibility to dispose of the removed structure.
306 * Use get_cpu_call_rcu_data() to obtain a pointer to the old structure
307 * (prior to NULLing it out, of course).
b57aee66
PM
308 */
309
310int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp)
311{
312 int warned = 0;
313
314 call_rcu_lock(&call_rcu_mutex);
315 if (cpu < 0 || maxcpus <= cpu) {
316 if (!warned) {
317 fprintf(stderr, "[error] liburcu: set CPU # out of range\n");
318 warned = 1;
319 }
320 call_rcu_unlock(&call_rcu_mutex);
321 errno = EINVAL;
322 return -EINVAL;
323 }
324 alloc_cpu_call_rcu_data();
325 call_rcu_unlock(&call_rcu_mutex);
326 if (per_cpu_call_rcu_data == NULL) {
327 errno = ENOMEM;
328 return -ENOMEM;
329 }
330 per_cpu_call_rcu_data[cpu] = crdp;
331 return 0;
332}
333
334/*
335 * Return a pointer to the default call_rcu_data structure, creating
336 * one if need be. Because we never free call_rcu_data structures,
337 * we don't need to be in an RCU read-side critical section.
338 */
339
340struct call_rcu_data *get_default_call_rcu_data(void)
341{
342 if (default_call_rcu_data != NULL)
343 return rcu_dereference(default_call_rcu_data);
344 call_rcu_lock(&call_rcu_mutex);
345 if (default_call_rcu_data != NULL) {
346 call_rcu_unlock(&call_rcu_mutex);
347 return default_call_rcu_data;
348 }
349 call_rcu_data_init(&default_call_rcu_data, 0);
350 call_rcu_unlock(&call_rcu_mutex);
351 return default_call_rcu_data;
352}
353
354/*
355 * Return the call_rcu_data structure that applies to the currently
356 * running thread. Any call_rcu_data structure assigned specifically
357 * to this thread has first priority, followed by any call_rcu_data
358 * structure assigned to the CPU on which the thread is running,
359 * followed by the default call_rcu_data structure. If there is not
360 * yet a default call_rcu_data structure, one will be created.
361 */
362struct call_rcu_data *get_call_rcu_data(void)
363{
364 int curcpu;
365 static int warned = 0;
366
367 if (thread_call_rcu_data != NULL)
368 return thread_call_rcu_data;
369 if (maxcpus <= 0)
370 return get_default_call_rcu_data();
371 curcpu = sched_getcpu();
372 if (!warned && (curcpu < 0 || maxcpus <= curcpu)) {
373 fprintf(stderr, "[error] liburcu: gcrd CPU # out of range\n");
374 warned = 1;
375 }
376 if (curcpu >= 0 && maxcpus > curcpu &&
377 per_cpu_call_rcu_data != NULL &&
378 per_cpu_call_rcu_data[curcpu] != NULL)
379 return per_cpu_call_rcu_data[curcpu];
380 return get_default_call_rcu_data();
381}
382
383/*
384 * Return a pointer to this task's call_rcu_data if there is one.
385 */
386
387struct call_rcu_data *get_thread_call_rcu_data(void)
388{
389 return thread_call_rcu_data;
390}
391
392/*
393 * Set this task's call_rcu_data structure as specified, regardless
394 * of whether or not this task already had one. (This allows switching
395 * to and from real-time call_rcu threads, for example.)
7106ddf8
PM
396 *
397 * Use NULL to remove a thread's call_rcu_data structure, but it is
398 * the caller's responsibility to dispose of the removed structure.
399 * Use get_thread_call_rcu_data() to obtain a pointer to the old structure
400 * (prior to NULLing it out, of course).
b57aee66
PM
401 */
402
403void set_thread_call_rcu_data(struct call_rcu_data *crdp)
404{
405 thread_call_rcu_data = crdp;
406}
407
408/*
409 * Create a separate call_rcu thread for each CPU. This does not
410 * replace a pre-existing call_rcu thread -- use the set_cpu_call_rcu_data()
411 * function if you want that behavior.
412 */
413
414int create_all_cpu_call_rcu_data(unsigned long flags)
415{
416 int i;
417 struct call_rcu_data *crdp;
418 int ret;
419
420 call_rcu_lock(&call_rcu_mutex);
421 alloc_cpu_call_rcu_data();
422 call_rcu_unlock(&call_rcu_mutex);
423 if (maxcpus <= 0) {
424 errno = EINVAL;
425 return -EINVAL;
426 }
427 if (per_cpu_call_rcu_data == NULL) {
428 errno = ENOMEM;
429 return -ENOMEM;
430 }
431 for (i = 0; i < maxcpus; i++) {
432 call_rcu_lock(&call_rcu_mutex);
433 if (get_cpu_call_rcu_data(i)) {
434 call_rcu_unlock(&call_rcu_mutex);
435 continue;
436 }
3c24913f 437 crdp = __create_call_rcu_data(flags);
b57aee66
PM
438 if (crdp == NULL) {
439 call_rcu_unlock(&call_rcu_mutex);
440 errno = ENOMEM;
441 return -ENOMEM;
442 }
443 call_rcu_unlock(&call_rcu_mutex);
444 if ((ret = set_cpu_call_rcu_data(i, crdp)) != 0) {
445 /* FIXME: Leaks crdp for now. */
446 return ret; /* Can happen on race. */
447 }
448 }
449 return 0;
450}
451
7106ddf8
PM
452/*
453 * Wake up the call_rcu thread corresponding to the specified
454 * call_rcu_data structure.
455 */
456static void wake_call_rcu_thread(struct call_rcu_data *crdp)
457{
458 if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) {
459 call_rcu_lock(&crdp->mtx);
460 if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) {
461 if (pthread_cond_signal(&crdp->cond) != 0) {
462 perror("pthread_cond_signal");
463 exit(-1);
464 }
465 }
466 call_rcu_unlock(&crdp->mtx);
467 }
468}
469
b57aee66
PM
470/*
471 * Schedule a function to be invoked after a following grace period.
472 * This is the only function that must be called -- the others are
473 * only present to allow applications to tune their use of RCU for
474 * maximum performance.
475 *
476 * Note that unless a call_rcu thread has not already been created,
477 * the first invocation of call_rcu() will create one. So, if you
478 * need the first invocation of call_rcu() to be fast, make sure
479 * to create a call_rcu thread first. One way to accomplish this is
480 * "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data().
481 */
482
483void call_rcu(struct rcu_head *head,
484 void (*func)(struct rcu_head *head))
485{
486 struct call_rcu_data *crdp;
487
488 cds_wfq_node_init(&head->next);
489 head->func = func;
490 crdp = get_call_rcu_data();
491 cds_wfq_enqueue(&crdp->cbs, &head->next);
492 uatomic_inc(&crdp->qlen);
7106ddf8
PM
493 wake_call_rcu_thread(crdp);
494}
495
496/*
497 * Free up the specified call_rcu_data structure, terminating the
498 * associated call_rcu thread. The caller must have previously
499 * removed the call_rcu_data structure from per-thread or per-CPU
500 * usage. For example, set_cpu_call_rcu_data(cpu, NULL) for per-CPU
501 * call_rcu_data structures or set_thread_call_rcu_data(NULL) for
502 * per-thread call_rcu_data structures.
503 *
504 * We silently refuse to free up the default call_rcu_data structure
505 * because that is where we put any leftover callbacks. Note that
506 * the possibility of self-spawning callbacks makes it impossible
507 * to execute all the callbacks in finite time without putting any
508 * newly spawned callbacks somewhere else. The "somewhere else" of
509 * last resort is the default call_rcu_data structure.
510 *
511 * We also silently refuse to free NULL pointers. This simplifies
512 * the calling code.
513 */
514void call_rcu_data_free(struct call_rcu_data *crdp)
515{
516 struct cds_wfq_node *cbs;
517 struct cds_wfq_node **cbs_tail;
518 struct cds_wfq_node **cbs_endprev;
519
520 if (crdp == NULL || crdp == default_call_rcu_data) {
521 return;
522 }
523 if ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) {
b57aee66 524 call_rcu_lock(&crdp->mtx);
7106ddf8 525 crdp->flags |= URCU_CALL_RCU_STOP;
b57aee66 526 call_rcu_unlock(&crdp->mtx);
7106ddf8
PM
527 wake_call_rcu_thread(crdp);
528 while ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0)
529 poll(NULL, 0, 1);
530 }
531 if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
532 while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
533 poll(NULL, 0, 1);
534 _CMM_STORE_SHARED(crdp->cbs.head, NULL);
535 cbs_tail = (struct cds_wfq_node **)
536 uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
537 cbs_endprev = (struct cds_wfq_node **)
538 uatomic_xchg(&default_call_rcu_data, cbs_tail);
539 *cbs_endprev = cbs;
540 uatomic_add(&default_call_rcu_data->qlen,
541 uatomic_read(&crdp->qlen));
542 cds_list_del(&crdp->list);
543 free(crdp);
544 }
545}
546
547/*
548 * Clean up all the per-CPU call_rcu threads.
549 */
550void free_all_cpu_call_rcu_data(void)
551{
552 int cpu;
553 struct call_rcu_data *crdp;
554
555 if (maxcpus <= 0)
556 return;
557 for (cpu = 0; cpu < maxcpus; cpu++) {
558 crdp = get_cpu_call_rcu_data(cpu);
559 if (crdp == NULL)
560 continue;
561 set_cpu_call_rcu_data(cpu, NULL);
562 call_rcu_data_free(crdp);
563 }
564}
565
81ad2e19
PM
566/*
567 * Acquire the call_rcu_mutex in order to ensure that the child sees
568 * all of the call_rcu() data structures in a consistent state.
569 * Suitable for pthread_atfork() and friends.
570 */
571void call_rcu_before_fork(void)
572{
573 call_rcu_lock(&call_rcu_mutex);
574}
575
576/*
577 * Clean up call_rcu data structures in the parent of a successful fork()
578 * that is not followed by exec() in the child. Suitable for
579 * pthread_atfork() and friends.
580 */
581void call_rcu_after_fork_parent(void)
582{
583 call_rcu_unlock(&call_rcu_mutex);
584}
585
7106ddf8
PM
586/*
587 * Clean up call_rcu data structures in the child of a successful fork()
81ad2e19
PM
588 * that is not followed by exec(). Suitable for pthread_atfork() and
589 * friends.
7106ddf8
PM
590 */
591void call_rcu_after_fork_child(void)
592{
593 struct call_rcu_data *crdp;
594
81ad2e19
PM
595 /* Release the mutex. */
596 call_rcu_unlock(&call_rcu_mutex);
597
7106ddf8
PM
598 /*
599 * Allocate a new default call_rcu_data structure in order
600 * to get a working call_rcu thread to go with it.
601 */
602 default_call_rcu_data = NULL;
603 (void)get_default_call_rcu_data();
604
605 /* Dispose of all of the rest of the call_rcu_data structures. */
606 while (call_rcu_data_list.next != call_rcu_data_list.prev) {
607 crdp = cds_list_entry(call_rcu_data_list.prev,
608 struct call_rcu_data, list);
609 if (crdp == default_call_rcu_data)
610 crdp = cds_list_entry(crdp->list.prev,
611 struct call_rcu_data, list);
612 crdp->flags = URCU_CALL_RCU_STOPPED;
613 call_rcu_data_free(crdp);
b57aee66
PM
614 }
615}
This page took 0.070864 seconds and 4 git commands to generate.