urcu-defer: fix futex wakeup value
[userspace-rcu.git] / urcu-defer.c
1 /*
2 * urcu-defer.c
3 *
4 * Userspace RCU library - batch memory reclamation
5 *
6 * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23 #include <stdio.h>
24 #include <pthread.h>
25 #include <signal.h>
26 #include <assert.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <poll.h>
31 #include <sys/time.h>
32 #include <syscall.h>
33 #include <unistd.h>
34
35 #include "urcu-defer-static.h"
36 /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
37 #include "urcu-defer.h"
38
39 #define futex(...) syscall(__NR_futex, __VA_ARGS__)
40 #define FUTEX_WAIT 0
41 #define FUTEX_WAKE 1
42
43 void __attribute__((destructor)) urcu_defer_exit(void);
44
45 extern void synchronize_rcu(void);
46
47 /*
48 * urcu_defer_mutex nests inside defer_thread_mutex.
49 */
50 static pthread_mutex_t urcu_defer_mutex = PTHREAD_MUTEX_INITIALIZER;
51 static pthread_mutex_t defer_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
52
53 static int defer_thread_futex;
54
55 /*
56 * Written to only by each individual deferer. Read by both the deferer and
57 * the reclamation tread.
58 */
59 static struct defer_queue __thread defer_queue;
60
61 /* Thread IDs of registered deferers */
62 #define INIT_NUM_THREADS 4
63
64 struct deferer_registry {
65 pthread_t tid;
66 struct defer_queue *defer_queue;
67 unsigned long last_head;
68 };
69
70 static struct deferer_registry *registry;
71 static int num_deferers, alloc_deferers;
72
73 static pthread_t tid_defer;
74
75 static void internal_urcu_lock(pthread_mutex_t *mutex)
76 {
77 int ret;
78
79 #ifndef DISTRUST_SIGNALS_EXTREME
80 ret = pthread_mutex_lock(mutex);
81 if (ret) {
82 perror("Error in pthread mutex lock");
83 exit(-1);
84 }
85 #else /* #ifndef DISTRUST_SIGNALS_EXTREME */
86 while ((ret = pthread_mutex_trylock(mutex)) != 0) {
87 if (ret != EBUSY && ret != EINTR) {
88 printf("ret = %d, errno = %d\n", ret, errno);
89 perror("Error in pthread mutex lock");
90 exit(-1);
91 }
92 pthread_testcancel();
93 poll(NULL,0,10);
94 }
95 #endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
96 }
97
98 static void internal_urcu_unlock(pthread_mutex_t *mutex)
99 {
100 int ret;
101
102 ret = pthread_mutex_unlock(mutex);
103 if (ret) {
104 perror("Error in pthread mutex unlock");
105 exit(-1);
106 }
107 }
108
109 /*
110 * Wake-up any waiting defer thread. Called from many concurrent threads.
111 */
112 static void wake_up_defer(void)
113 {
114 if (unlikely(atomic_read(&defer_thread_futex) == -1)) {
115 atomic_set(&defer_thread_futex, 0);
116 futex(&defer_thread_futex, FUTEX_WAKE, 1,
117 NULL, NULL, 0);
118 }
119 }
120
121 static unsigned long rcu_defer_num_callbacks(void)
122 {
123 unsigned long num_items = 0, head;
124 struct deferer_registry *index;
125
126 internal_urcu_lock(&urcu_defer_mutex);
127 for (index = registry; index < registry + num_deferers; index++) {
128 head = LOAD_SHARED(index->defer_queue->head);
129 num_items += head - index->defer_queue->tail;
130 }
131 internal_urcu_unlock(&urcu_defer_mutex);
132 return num_items;
133 }
134
135 /*
136 * Defer thread waiting. Single thread.
137 */
138 static void wait_defer(void)
139 {
140 atomic_dec(&defer_thread_futex);
141 smp_mb(); /* Write futex before read queue */
142 if (rcu_defer_num_callbacks()) {
143 smp_mb(); /* Read queue before write futex */
144 /* Callbacks are queued, don't wait. */
145 atomic_set(&defer_thread_futex, 0);
146 } else {
147 smp_rmb(); /* Read queue before read futex */
148 if (atomic_read(&defer_thread_futex) == -1)
149 futex(&defer_thread_futex, FUTEX_WAIT, -1,
150 NULL, NULL, 0);
151 }
152 }
153
154 /*
155 * Must be called after Q.S. is reached.
156 */
157 static void rcu_defer_barrier_queue(struct defer_queue *queue,
158 unsigned long head)
159 {
160 unsigned long i;
161 void (*fct)(void *p);
162 void *p;
163
164 /*
165 * Tail is only modified when lock is held.
166 * Head is only modified by owner thread.
167 */
168
169 for (i = queue->tail; i != head;) {
170 smp_rmb(); /* read head before q[]. */
171 p = LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
172 if (unlikely(DQ_IS_FCT_BIT(p))) {
173 DQ_CLEAR_FCT_BIT(p);
174 queue->last_fct_out = p;
175 p = LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
176 } else if (unlikely(p == DQ_FCT_MARK)) {
177 p = LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
178 queue->last_fct_out = p;
179 p = LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
180 }
181 fct = queue->last_fct_out;
182 fct(p);
183 }
184 smp_mb(); /* push tail after having used q[] */
185 STORE_SHARED(queue->tail, i);
186 }
187
188 static void _rcu_defer_barrier_thread(void)
189 {
190 unsigned long head, num_items;
191
192 head = defer_queue.head;
193 num_items = head - defer_queue.tail;
194 if (unlikely(!num_items))
195 return;
196 synchronize_rcu();
197 rcu_defer_barrier_queue(&defer_queue, head);
198 }
199
200
201 void rcu_defer_barrier_thread(void)
202 {
203 internal_urcu_lock(&urcu_defer_mutex);
204 _rcu_defer_barrier_thread();
205 internal_urcu_unlock(&urcu_defer_mutex);
206 }
207
208 /*
209 * rcu_defer_barrier - Execute all queued rcu callbacks.
210 *
211 * Execute all RCU callbacks queued before rcu_defer_barrier() execution.
212 * All callbacks queued on the local thread prior to a rcu_defer_barrier() call
213 * are guaranteed to be executed.
214 * Callbacks queued by other threads concurrently with rcu_defer_barrier()
215 * execution are not guaranteed to be executed in the current batch (could
216 * be left for the next batch). These callbacks queued by other threads are only
217 * guaranteed to be executed if there is explicit synchronization between
218 * the thread adding to the queue and the thread issuing the defer_barrier call.
219 */
220
221 void rcu_defer_barrier(void)
222 {
223 struct deferer_registry *index;
224 unsigned long num_items = 0;
225
226 if (!registry)
227 return;
228
229 internal_urcu_lock(&urcu_defer_mutex);
230 for (index = registry; index < registry + num_deferers; index++) {
231 index->last_head = LOAD_SHARED(index->defer_queue->head);
232 num_items += index->last_head - index->defer_queue->tail;
233 }
234 if (likely(!num_items)) {
235 /*
236 * We skip the grace period because there are no queued
237 * callbacks to execute.
238 */
239 goto end;
240 }
241 synchronize_rcu();
242 for (index = registry; index < registry + num_deferers; index++)
243 rcu_defer_barrier_queue(index->defer_queue,
244 index->last_head);
245 end:
246 internal_urcu_unlock(&urcu_defer_mutex);
247 }
248
249 /*
250 * _rcu_defer_queue - Queue a RCU callback.
251 */
252 void _rcu_defer_queue(void (*fct)(void *p), void *p)
253 {
254 unsigned long head, tail;
255
256 /*
257 * Head is only modified by ourself. Tail can be modified by reclamation
258 * thread.
259 */
260 head = defer_queue.head;
261 tail = LOAD_SHARED(defer_queue.tail);
262
263 /*
264 * If queue is full, empty it ourself.
265 * Worse-case: must allow 2 supplementary entries for fct pointer.
266 */
267 if (unlikely(head - tail >= DEFER_QUEUE_SIZE - 2)) {
268 assert(head - tail <= DEFER_QUEUE_SIZE);
269 rcu_defer_barrier_thread();
270 assert(head - LOAD_SHARED(defer_queue.tail) == 0);
271 }
272
273 if (unlikely(defer_queue.last_fct_in != fct)) {
274 defer_queue.last_fct_in = fct;
275 if (unlikely(DQ_IS_FCT_BIT(fct) || fct == DQ_FCT_MARK)) {
276 /*
277 * If the function to encode is not aligned or the
278 * marker, write DQ_FCT_MARK followed by the function
279 * pointer.
280 */
281 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK],
282 DQ_FCT_MARK);
283 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK],
284 fct);
285 } else {
286 DQ_SET_FCT_BIT(fct);
287 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK],
288 fct);
289 }
290 } else {
291 if (unlikely(DQ_IS_FCT_BIT(p) || p == DQ_FCT_MARK)) {
292 /*
293 * If the data to encode is not aligned or the marker,
294 * write DQ_FCT_MARK followed by the function pointer.
295 */
296 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK],
297 DQ_FCT_MARK);
298 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK],
299 fct);
300 }
301 }
302 _STORE_SHARED(defer_queue.q[head++ & DEFER_QUEUE_MASK], p);
303 smp_wmb(); /* Publish new pointer before head */
304 /* Write q[] before head. */
305 STORE_SHARED(defer_queue.head, head);
306 smp_mb(); /* Write queue head before read futex */
307 /*
308 * Wake-up any waiting defer thread.
309 */
310 wake_up_defer();
311 }
312
313 void *thr_defer(void *args)
314 {
315 for (;;) {
316 pthread_testcancel();
317 /*
318 * "Be green". Don't wake up the CPU if there is no RCU work
319 * to perform whatsoever. Aims at saving laptop battery life by
320 * leaving the processor in sleep state when idle.
321 */
322 wait_defer();
323 /* Sleeping after wait_defer to let many callbacks enqueue */
324 poll(NULL,0,100); /* wait for 100ms */
325 rcu_defer_barrier();
326 }
327
328 return NULL;
329 }
330
331 /*
332 * library wrappers to be used by non-LGPL compatible source code.
333 */
334
335 void rcu_defer_queue(void (*fct)(void *p), void *p)
336 {
337 _rcu_defer_queue(fct, p);
338 }
339
340 static void rcu_add_deferer(pthread_t id)
341 {
342 struct deferer_registry *oldarray;
343
344 if (!registry) {
345 alloc_deferers = INIT_NUM_THREADS;
346 num_deferers = 0;
347 registry =
348 malloc(sizeof(struct deferer_registry) * alloc_deferers);
349 }
350 if (alloc_deferers < num_deferers + 1) {
351 oldarray = registry;
352 registry = malloc(sizeof(struct deferer_registry)
353 * (alloc_deferers << 1));
354 memcpy(registry, oldarray,
355 sizeof(struct deferer_registry) * alloc_deferers);
356 alloc_deferers <<= 1;
357 free(oldarray);
358 }
359 registry[num_deferers].tid = id;
360 /* reference to the TLS of _this_ deferer thread. */
361 registry[num_deferers].defer_queue = &defer_queue;
362 registry[num_deferers].last_head = 0;
363 num_deferers++;
364 }
365
366 /*
367 * Never shrink (implementation limitation).
368 * This is O(nb threads). Eventually use a hash table.
369 */
370 static void rcu_remove_deferer(pthread_t id)
371 {
372 struct deferer_registry *index;
373
374 assert(registry != NULL);
375 for (index = registry; index < registry + num_deferers; index++) {
376 if (pthread_equal(index->tid, id)) {
377 memcpy(index, &registry[num_deferers - 1],
378 sizeof(struct deferer_registry));
379 registry[num_deferers - 1].tid = 0;
380 registry[num_deferers - 1].defer_queue = NULL;
381 registry[num_deferers - 1].last_head = 0;
382 num_deferers--;
383 return;
384 }
385 }
386 /* Hrm not found, forgot to register ? */
387 assert(0);
388 }
389
390 static void start_defer_thread(void)
391 {
392 int ret;
393
394 ret = pthread_create(&tid_defer, NULL, thr_defer,
395 NULL);
396 assert(!ret);
397 }
398
399 static void stop_defer_thread(void)
400 {
401 int ret;
402 void *tret;
403
404 pthread_cancel(tid_defer);
405 wake_up_defer();
406 ret = pthread_join(tid_defer, &tret);
407 assert(!ret);
408 }
409
410 void rcu_defer_register_thread(void)
411 {
412 int deferers;
413
414 internal_urcu_lock(&defer_thread_mutex);
415 internal_urcu_lock(&urcu_defer_mutex);
416 defer_queue.q = malloc(sizeof(void *) * DEFER_QUEUE_SIZE);
417 rcu_add_deferer(pthread_self());
418 deferers = num_deferers;
419 internal_urcu_unlock(&urcu_defer_mutex);
420
421 if (deferers == 1)
422 start_defer_thread();
423 internal_urcu_unlock(&defer_thread_mutex);
424 }
425
426 void rcu_defer_unregister_thread(void)
427 {
428 int deferers;
429
430 internal_urcu_lock(&defer_thread_mutex);
431 internal_urcu_lock(&urcu_defer_mutex);
432 rcu_remove_deferer(pthread_self());
433 _rcu_defer_barrier_thread();
434 free(defer_queue.q);
435 defer_queue.q = NULL;
436 deferers = num_deferers;
437 internal_urcu_unlock(&urcu_defer_mutex);
438
439 if (deferers == 0)
440 stop_defer_thread();
441 internal_urcu_unlock(&defer_thread_mutex);
442 }
443
444 void urcu_defer_exit(void)
445 {
446 free(registry);
447 }
This page took 0.038736 seconds and 4 git commands to generate.