hash table comment fix.
[urcu.git] / urcu-defer.c
CommitLineData
786ee85b
MD
1/*
2 * urcu-defer.c
3 *
4 * Userspace RCU library - batch memory reclamation
5 *
6 * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23#include <stdio.h>
24#include <pthread.h>
25#include <signal.h>
26#include <assert.h>
27#include <stdlib.h>
28#include <string.h>
29#include <errno.h>
30#include <poll.h>
4ce9e4f2
MD
31#include <sys/time.h>
32#include <syscall.h>
33#include <unistd.h>
786ee85b
MD
34
35#include "urcu-defer-static.h"
36/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
37#include "urcu-defer.h"
38
4ce9e4f2 39#define futex(...) syscall(__NR_futex, __VA_ARGS__)
80cca311
MD
40#define FUTEX_WAIT 0
41#define FUTEX_WAKE 1
4ce9e4f2 42
786ee85b
MD
43void __attribute__((destructor)) urcu_defer_exit(void);
44
45extern void synchronize_rcu(void);
46
47/*
48 * urcu_defer_mutex nests inside defer_thread_mutex.
49 */
50static pthread_mutex_t urcu_defer_mutex = PTHREAD_MUTEX_INITIALIZER;
51static pthread_mutex_t defer_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
52
3dce4bfa 53static int defer_thread_futex;
4ce9e4f2 54
786ee85b
MD
55/*
56 * Written to only by each individual deferer. Read by both the deferer and
57 * the reclamation tread.
58 */
3dce4bfa 59static struct defer_queue __thread defer_queue;
786ee85b
MD
60
61/* Thread IDs of registered deferers */
62#define INIT_NUM_THREADS 4
63
64struct deferer_registry {
65 pthread_t tid;
66 struct defer_queue *defer_queue;
67 unsigned long last_head;
68};
69
70static struct deferer_registry *registry;
71static int num_deferers, alloc_deferers;
72
73static pthread_t tid_defer;
4ce9e4f2 74
786ee85b
MD
75static void internal_urcu_lock(pthread_mutex_t *mutex)
76{
77 int ret;
78
79#ifndef DISTRUST_SIGNALS_EXTREME
80 ret = pthread_mutex_lock(mutex);
81 if (ret) {
82 perror("Error in pthread mutex lock");
83 exit(-1);
84 }
85#else /* #ifndef DISTRUST_SIGNALS_EXTREME */
86 while ((ret = pthread_mutex_trylock(mutex)) != 0) {
87 if (ret != EBUSY && ret != EINTR) {
88 printf("ret = %d, errno = %d\n", ret, errno);
89 perror("Error in pthread mutex lock");
90 exit(-1);
91 }
4ce9e4f2 92 pthread_testcancel();
786ee85b
MD
93 poll(NULL,0,10);
94 }
95#endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
96}
97
98static void internal_urcu_unlock(pthread_mutex_t *mutex)
99{
100 int ret;
101
102 ret = pthread_mutex_unlock(mutex);
103 if (ret) {
104 perror("Error in pthread mutex unlock");
105 exit(-1);
106 }
107}
108
04eb9c4f
MD
109/*
110 * Wake-up any waiting defer thread. Called from many concurrent threads.
111 */
112static void wake_up_defer(void)
113{
114 if (unlikely(atomic_read(&defer_thread_futex) == -1)) {
115 atomic_set(&defer_thread_futex, 0);
8fd3b219 116 futex(&defer_thread_futex, FUTEX_WAKE, 1,
04eb9c4f
MD
117 NULL, NULL, 0);
118 }
119}
120
121static unsigned long rcu_defer_num_callbacks(void)
122{
123 unsigned long num_items = 0, head;
124 struct deferer_registry *index;
125
126 internal_urcu_lock(&urcu_defer_mutex);
127 for (index = registry; index < registry + num_deferers; index++) {
128 head = LOAD_SHARED(index->defer_queue->head);
129 num_items += head - index->defer_queue->tail;
130 }
131 internal_urcu_unlock(&urcu_defer_mutex);
132 return num_items;
133}
134
135/*
136 * Defer thread waiting. Single thread.
137 */
138static void wait_defer(void)
139{
140 atomic_dec(&defer_thread_futex);
141 smp_mb(); /* Write futex before read queue */
142 if (rcu_defer_num_callbacks()) {
143 smp_mb(); /* Read queue before write futex */
144 /* Callbacks are queued, don't wait. */
145 atomic_set(&defer_thread_futex, 0);
146 } else {
147 smp_rmb(); /* Read queue before read futex */
148 if (atomic_read(&defer_thread_futex) == -1)
149 futex(&defer_thread_futex, FUTEX_WAIT, -1,
150 NULL, NULL, 0);
151 }
152}
153
786ee85b
MD
154/*
155 * Must be called after Q.S. is reached.
156 */
157static void rcu_defer_barrier_queue(struct defer_queue *queue,
804b4375 158 unsigned long head)
786ee85b
MD
159{
160 unsigned long i;
804b4375
MD
161 void (*fct)(void *p);
162 void *p;
786ee85b
MD
163
164 /*
165 * Tail is only modified when lock is held.
166 * Head is only modified by owner thread.
167 */
168
804b4375 169 for (i = queue->tail; i != head;) {
786ee85b 170 smp_rmb(); /* read head before q[]. */
804b4375
MD
171 p = LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
172 if (unlikely(DQ_IS_FCT_BIT(p))) {
804b4375
MD
173 DQ_CLEAR_FCT_BIT(p);
174 queue->last_fct_out = p;
175 p = LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
176 } else if (unlikely(p == DQ_FCT_MARK)) {
804b4375
MD
177 p = LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
178 queue->last_fct_out = p;
179 p = LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
29cdb8d8 180 }
804b4375 181 fct = queue->last_fct_out;
804b4375 182 fct(p);
786ee85b
MD
183 }
184 smp_mb(); /* push tail after having used q[] */
185 STORE_SHARED(queue->tail, i);
186}
187
188static void _rcu_defer_barrier_thread(void)
189{
0d0e6c21 190 unsigned long head, num_items;
786ee85b
MD
191
192 head = defer_queue.head;
0d0e6c21
MD
193 num_items = head - defer_queue.tail;
194 if (unlikely(!num_items))
195 return;
786ee85b
MD
196 synchronize_rcu();
197 rcu_defer_barrier_queue(&defer_queue, head);
198}
199
200
201void rcu_defer_barrier_thread(void)
202{
203 internal_urcu_lock(&urcu_defer_mutex);
204 _rcu_defer_barrier_thread();
205 internal_urcu_unlock(&urcu_defer_mutex);
206}
207
0d0e6c21
MD
208/*
209 * rcu_defer_barrier - Execute all queued rcu callbacks.
210 *
211 * Execute all RCU callbacks queued before rcu_defer_barrier() execution.
212 * All callbacks queued on the local thread prior to a rcu_defer_barrier() call
213 * are guaranteed to be executed.
214 * Callbacks queued by other threads concurrently with rcu_defer_barrier()
215 * execution are not guaranteed to be executed in the current batch (could
216 * be left for the next batch). These callbacks queued by other threads are only
217 * guaranteed to be executed if there is explicit synchronization between
218 * the thread adding to the queue and the thread issuing the defer_barrier call.
219 */
220
786ee85b
MD
221void rcu_defer_barrier(void)
222{
223 struct deferer_registry *index;
0d0e6c21 224 unsigned long num_items = 0;
786ee85b
MD
225
226 if (!registry)
227 return;
228
229 internal_urcu_lock(&urcu_defer_mutex);
0d0e6c21 230 for (index = registry; index < registry + num_deferers; index++) {
786ee85b 231 index->last_head = LOAD_SHARED(index->defer_queue->head);
0d0e6c21
MD
232 num_items += index->last_head - index->defer_queue->tail;
233 }
234 if (likely(!num_items)) {
235 /*
236 * We skip the grace period because there are no queued
237 * callbacks to execute.
238 */
239 goto end;
240 }
786ee85b
MD
241 synchronize_rcu();
242 for (index = registry; index < registry + num_deferers; index++)
243 rcu_defer_barrier_queue(index->defer_queue,
244 index->last_head);
0d0e6c21 245end:
786ee85b
MD
246 internal_urcu_unlock(&urcu_defer_mutex);
247}
248
2c22932b
MD
249/*
250 * _rcu_defer_queue - Queue a RCU callback.
251 */
252void _rcu_defer_queue(void (*fct)(void *p), void *p)
253{
254 unsigned long head, tail;
255
256 /*
257 * Head is only modified by ourself. Tail can be modified by reclamation
258 * thread.
259 */
260 head = defer_queue.head;
261 tail = LOAD_SHARED(defer_queue.tail);
262
263 /*
264 * If queue is full, empty it ourself.
265 * Worse-case: must allow 2 supplementary entries for fct pointer.
266 */
267 if (unlikely(head - tail >= DEFER_QUEUE_SIZE - 2)) {
268 assert(head - tail <= DEFER_QUEUE_SIZE);
269 rcu_defer_barrier_thread();
270 assert(head - LOAD_SHARED(defer_queue.tail) == 0);
271 }
272
273 if (unlikely(defer_queue.last_fct_in != fct)) {
274 defer_queue.last_fct_in = fct;
275 if (unlikely(DQ_IS_FCT_BIT(fct) || fct == DQ_FCT_MARK)) {
276 /*
277 * If the function to encode is not aligned or the
278 * marker, write DQ_FCT_MARK followed by the function
279 * pointer.
280 */
281 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK],
282 DQ_FCT_MARK);
283 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK],
284 fct);
285 } else {
286 DQ_SET_FCT_BIT(fct);
287 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK],
288 fct);
289 }
290 } else {
291 if (unlikely(DQ_IS_FCT_BIT(p) || p == DQ_FCT_MARK)) {
292 /*
293 * If the data to encode is not aligned or the marker,
294 * write DQ_FCT_MARK followed by the function pointer.
295 */
296 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK],
297 DQ_FCT_MARK);
298 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK],
299 fct);
300 }
301 }
302 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], p);
303 smp_wmb(); /* Publish new pointer before head */
304 /* Write q[] before head. */
305 STORE_SHARED(defer_queue.head, head);
04eb9c4f 306 smp_mb(); /* Write queue head before read futex */
2c22932b
MD
307 /*
308 * Wake-up any waiting defer thread.
309 */
310 wake_up_defer();
311}
312
786ee85b
MD
313void *thr_defer(void *args)
314{
315 for (;;) {
4ce9e4f2 316 pthread_testcancel();
4ce9e4f2
MD
317 /*
318 * "Be green". Don't wake up the CPU if there is no RCU work
319 * to perform whatsoever. Aims at saving laptop battery life by
320 * leaving the processor in sleep state when idle.
321 */
4ce9e4f2 322 wait_defer();
4ce9e4f2 323 /* Sleeping after wait_defer to let many callbacks enqueue */
71df5ef4 324 poll(NULL,0,100); /* wait for 100ms */
786ee85b
MD
325 rcu_defer_barrier();
326 }
327
328 return NULL;
329}
330
331/*
332 * library wrappers to be used by non-LGPL compatible source code.
333 */
334
804b4375 335void rcu_defer_queue(void (*fct)(void *p), void *p)
786ee85b 336{
804b4375 337 _rcu_defer_queue(fct, p);
786ee85b
MD
338}
339
340static void rcu_add_deferer(pthread_t id)
341{
342 struct deferer_registry *oldarray;
343
344 if (!registry) {
345 alloc_deferers = INIT_NUM_THREADS;
346 num_deferers = 0;
347 registry =
348 malloc(sizeof(struct deferer_registry) * alloc_deferers);
349 }
350 if (alloc_deferers < num_deferers + 1) {
351 oldarray = registry;
352 registry = malloc(sizeof(struct deferer_registry)
353 * (alloc_deferers << 1));
354 memcpy(registry, oldarray,
355 sizeof(struct deferer_registry) * alloc_deferers);
356 alloc_deferers <<= 1;
357 free(oldarray);
358 }
359 registry[num_deferers].tid = id;
360 /* reference to the TLS of _this_ deferer thread. */
361 registry[num_deferers].defer_queue = &defer_queue;
804b4375 362 registry[num_deferers].last_head = 0;
786ee85b
MD
363 num_deferers++;
364}
365
366/*
367 * Never shrink (implementation limitation).
368 * This is O(nb threads). Eventually use a hash table.
369 */
370static void rcu_remove_deferer(pthread_t id)
371{
372 struct deferer_registry *index;
373
374 assert(registry != NULL);
375 for (index = registry; index < registry + num_deferers; index++) {
376 if (pthread_equal(index->tid, id)) {
377 memcpy(index, &registry[num_deferers - 1],
378 sizeof(struct deferer_registry));
379 registry[num_deferers - 1].tid = 0;
380 registry[num_deferers - 1].defer_queue = NULL;
804b4375 381 registry[num_deferers - 1].last_head = 0;
786ee85b
MD
382 num_deferers--;
383 return;
384 }
385 }
386 /* Hrm not found, forgot to register ? */
387 assert(0);
388}
389
390static void start_defer_thread(void)
391{
392 int ret;
393
394 ret = pthread_create(&tid_defer, NULL, thr_defer,
395 NULL);
396 assert(!ret);
397}
398
399static void stop_defer_thread(void)
400{
401 int ret;
402 void *tret;
403
4ce9e4f2
MD
404 pthread_cancel(tid_defer);
405 wake_up_defer();
786ee85b
MD
406 ret = pthread_join(tid_defer, &tret);
407 assert(!ret);
408}
409
410void rcu_defer_register_thread(void)
411{
412 int deferers;
413
414 internal_urcu_lock(&defer_thread_mutex);
415 internal_urcu_lock(&urcu_defer_mutex);
416 defer_queue.q = malloc(sizeof(void *) * DEFER_QUEUE_SIZE);
417 rcu_add_deferer(pthread_self());
418 deferers = num_deferers;
419 internal_urcu_unlock(&urcu_defer_mutex);
420
421 if (deferers == 1)
422 start_defer_thread();
423 internal_urcu_unlock(&defer_thread_mutex);
424}
425
426void rcu_defer_unregister_thread(void)
427{
428 int deferers;
429
430 internal_urcu_lock(&defer_thread_mutex);
431 internal_urcu_lock(&urcu_defer_mutex);
432 rcu_remove_deferer(pthread_self());
433 _rcu_defer_barrier_thread();
434 free(defer_queue.q);
435 defer_queue.q = NULL;
436 deferers = num_deferers;
437 internal_urcu_unlock(&urcu_defer_mutex);
438
439 if (deferers == 0)
440 stop_defer_thread();
441 internal_urcu_unlock(&defer_thread_mutex);
442}
443
444void urcu_defer_exit(void)
445{
446 free(registry);
447}
This page took 0.038726 seconds and 4 git commands to generate.