change reader_data for reader_registry
[urcu.git] / urcu.c
1 /*
2 * urcu.c
3 *
4 * Userspace RCU library
5 *
6 * Copyright February 2009 - Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
7 *
8 * Distributed under GPLv2
9 */
10
11 #include <stdio.h>
12 #include <pthread.h>
13 #include <signal.h>
14 #include <assert.h>
15 #include <stdlib.h>
16 #include <string.h>
17
18 #include "urcu.h"
19
20 pthread_mutex_t urcu_mutex = PTHREAD_MUTEX_INITIALIZER;
21
22 /*
23 * Global grace period counter.
24 * Contains the current RCU_GP_CTR_BIT.
25 * Also has a RCU_GP_CTR_BIT of 1, to accelerate the reader fast path.
26 * Written to only by writer with mutex taken. Read by both writer and readers.
27 */
28 long urcu_gp_ctr = RCU_GP_COUNT;
29
30 /*
31 * Written to only by each individual reader. Read by both the reader and the
32 * writers.
33 */
34 long __thread urcu_active_readers;
35
36 /* Thread IDs of registered readers */
37 #define INIT_NUM_THREADS 4
38
39 struct reader_registry {
40 pthread_t tid;
41 long *urcu_active_readers;
42 };
43
44 #ifdef DEBUG_YIELD
45 unsigned int yield_active;
46 unsigned int __thread rand_yield;
47 #endif
48
49 static struct reader_registry *registry;
50 static int num_readers, alloc_readers;
51 #ifndef DEBUG_FULL_MB
52 static int sig_done;
53 #endif
54
55 void internal_urcu_lock(void)
56 {
57 #if 0
58 int ret;
59 /* Mutex sleeping does not play well with busy-waiting loop. */
60 ret = pthread_mutex_lock(&urcu_mutex);
61 if (ret) {
62 perror("Error in pthread mutex lock");
63 exit(-1);
64 }
65 #endif
66 while (pthread_mutex_trylock(&urcu_mutex) != 0)
67 cpu_relax();
68 }
69
70 void internal_urcu_unlock(void)
71 {
72 int ret;
73
74 ret = pthread_mutex_unlock(&urcu_mutex);
75 if (ret) {
76 perror("Error in pthread mutex unlock");
77 exit(-1);
78 }
79 }
80
81 /*
82 * called with urcu_mutex held.
83 */
84 static void switch_next_urcu_qparity(void)
85 {
86 STORE_SHARED(urcu_gp_ctr, urcu_gp_ctr ^ RCU_GP_CTR_BIT);
87 }
88
89 #ifdef DEBUG_FULL_MB
90 static void force_mb_single_thread(pthread_t tid)
91 {
92 smp_mb();
93 }
94
95 static void force_mb_all_threads(void)
96 {
97 smp_mb();
98 }
99 #else
100
101 static void force_mb_single_thread(pthread_t tid)
102 {
103 assert(registry);
104 sig_done = 0;
105 /*
106 * pthread_kill has a smp_mb(). But beware, we assume it performs
107 * a cache flush on architectures with non-coherent cache. Let's play
108 * safe and don't assume anything : we use smp_mc() to make sure the
109 * cache flush is enforced.
110 * smp_mb(); write sig_done before sending the signals
111 */
112 smp_mc(); /* write sig_done before sending the signals */
113 pthread_kill(tid, SIGURCU);
114 /*
115 * Wait for sighandler (and thus mb()) to execute on every thread.
116 * BUSY-LOOP.
117 */
118 while (LOAD_SHARED(sig_done) < 1)
119 cpu_relax();
120 smp_mb(); /* read sig_done before ending the barrier */
121 }
122
123 static void force_mb_all_threads(void)
124 {
125 struct reader_registry *index;
126 /*
127 * Ask for each threads to execute a smp_mb() so we can consider the
128 * compiler barriers around rcu read lock as real memory barriers.
129 */
130 if (!registry)
131 return;
132 sig_done = 0;
133 /*
134 * pthread_kill has a smp_mb(). But beware, we assume it performs
135 * a cache flush on architectures with non-coherent cache. Let's play
136 * safe and don't assume anything : we use smp_mc() to make sure the
137 * cache flush is enforced.
138 * smp_mb(); write sig_done before sending the signals
139 */
140 smp_mc(); /* write sig_done before sending the signals */
141 for (index = registry; index < registry + num_readers; index++)
142 pthread_kill(index->tid, SIGURCU);
143 /*
144 * Wait for sighandler (and thus mb()) to execute on every thread.
145 * BUSY-LOOP.
146 */
147 while (LOAD_SHARED(sig_done) < num_readers)
148 cpu_relax();
149 smp_mb(); /* read sig_done before ending the barrier */
150 }
151 #endif
152
153 void wait_for_quiescent_state(void)
154 {
155 struct reader_registry *index;
156
157 if (!registry)
158 return;
159 /*
160 * Wait for each thread urcu_active_readers count to become 0.
161 */
162 for (index = registry; index < registry + num_readers; index++) {
163 int wait_loops = 0;
164 /*
165 * BUSY-LOOP. Force the reader thread to commit its
166 * urcu_active_readers update to memory if we wait for too long.
167 */
168 while (rcu_old_gp_ongoing(index->urcu_active_readers)) {
169 if (wait_loops++ == KICK_READER_LOOPS) {
170 force_mb_single_thread(index->tid);
171 wait_loops = 0;
172 } else {
173 cpu_relax();
174 }
175 }
176 }
177 }
178
179 void synchronize_rcu(void)
180 {
181 internal_urcu_lock();
182
183 /* All threads should read qparity before accessing data structure
184 * where new ptr points to. Must be done within internal_urcu_lock
185 * because it iterates on reader threads.*/
186 /* Write new ptr before changing the qparity */
187 force_mb_all_threads();
188
189 switch_next_urcu_qparity(); /* 0 -> 1 */
190
191 /*
192 * Must commit qparity update to memory before waiting for parity
193 * 0 quiescent state. Failure to do so could result in the writer
194 * waiting forever while new readers are always accessing data (no
195 * progress).
196 * Ensured by STORE_SHARED and LOAD_SHARED.
197 */
198
199 /*
200 * Wait for previous parity to be empty of readers.
201 */
202 wait_for_quiescent_state(); /* Wait readers in parity 0 */
203
204 /*
205 * Must finish waiting for quiescent state for parity 0 before
206 * committing qparity update to memory. Failure to do so could result in
207 * the writer waiting forever while new readers are always accessing
208 * data (no progress).
209 * Ensured by STORE_SHARED and LOAD_SHARED.
210 */
211
212 switch_next_urcu_qparity(); /* 1 -> 0 */
213
214 /*
215 * Must commit qparity update to memory before waiting for parity
216 * 1 quiescent state. Failure to do so could result in the writer
217 * waiting forever while new readers are always accessing data (no
218 * progress).
219 * Ensured by STORE_SHARED and LOAD_SHARED.
220 */
221
222 /*
223 * Wait for previous parity to be empty of readers.
224 */
225 wait_for_quiescent_state(); /* Wait readers in parity 1 */
226
227 /* Finish waiting for reader threads before letting the old ptr being
228 * freed. Must be done within internal_urcu_lock because it iterates on
229 * reader threads. */
230 force_mb_all_threads();
231
232 internal_urcu_unlock();
233 }
234
235 void urcu_add_reader(pthread_t id)
236 {
237 struct reader_registry *oldarray;
238
239 if (!registry) {
240 alloc_readers = INIT_NUM_THREADS;
241 num_readers = 0;
242 registry =
243 malloc(sizeof(struct reader_registry) * alloc_readers);
244 }
245 if (alloc_readers < num_readers + 1) {
246 oldarray = registry;
247 registry = malloc(sizeof(struct reader_registry)
248 * (alloc_readers << 1));
249 memcpy(registry, oldarray,
250 sizeof(struct reader_registry) * alloc_readers);
251 alloc_readers <<= 1;
252 free(oldarray);
253 }
254 registry[num_readers].tid = id;
255 /* reference to the TLS of _this_ reader thread. */
256 registry[num_readers].urcu_active_readers = &urcu_active_readers;
257 num_readers++;
258 }
259
260 /*
261 * Never shrink (implementation limitation).
262 * This is O(nb threads). Eventually use a hash table.
263 */
264 void urcu_remove_reader(pthread_t id)
265 {
266 struct reader_registry *index;
267
268 assert(registry != NULL);
269 for (index = registry; index < registry + num_readers; index++) {
270 if (pthread_equal(index->tid, id)) {
271 memcpy(index, &registry[num_readers - 1],
272 sizeof(struct reader_registry));
273 registry[num_readers - 1].tid = 0;
274 registry[num_readers - 1].urcu_active_readers = NULL;
275 num_readers--;
276 return;
277 }
278 }
279 /* Hrm not found, forgot to register ? */
280 assert(0);
281 }
282
283 void urcu_register_thread(void)
284 {
285 internal_urcu_lock();
286 urcu_add_reader(pthread_self());
287 internal_urcu_unlock();
288 }
289
290 void urcu_unregister_thread(void)
291 {
292 internal_urcu_lock();
293 urcu_remove_reader(pthread_self());
294 internal_urcu_unlock();
295 }
296
297 #ifndef DEBUG_FULL_MB
298 void sigurcu_handler(int signo, siginfo_t *siginfo, void *context)
299 {
300 /*
301 * Executing this smp_mb() is the only purpose of this signal handler.
302 * It punctually promotes barrier() into smp_mb() on every thread it is
303 * executed on.
304 */
305 smp_mb();
306 atomic_inc(&sig_done);
307 }
308
309 void __attribute__((constructor)) urcu_init(void)
310 {
311 struct sigaction act;
312 int ret;
313
314 act.sa_sigaction = sigurcu_handler;
315 ret = sigaction(SIGURCU, &act, NULL);
316 if (ret) {
317 perror("Error in sigaction");
318 exit(-1);
319 }
320 }
321
322 void __attribute__((destructor)) urcu_exit(void)
323 {
324 struct sigaction act;
325 int ret;
326
327 ret = sigaction(SIGURCU, NULL, &act);
328 if (ret) {
329 perror("Error in sigaction");
330 exit(-1);
331 }
332 assert(act.sa_sigaction == sigurcu_handler);
333 free(registry);
334 }
335 #endif
This page took 0.058939 seconds and 5 git commands to generate.