883f41f3feec4c0869f27c794f44a420ba80fdb5
[userspace-rcu.git] / urcu / workqueue-fifo.h
1 #ifndef _URCU_WORKQUEUE_FIFO_H
2 #define _URCU_WORKQUEUE_FIFO_H
3
4 /*
5 * urcu/workqueue-fifo.h
6 *
7 * Userspace RCU library - work queue scheme with FIFO semantic
8 *
9 * Copyright (c) 2014 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 *
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License as published by the Free Software Foundation; either
14 * version 2.1 of the License, or (at your option) any later version.
15 *
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public
22 * License along with this library; if not, write to the Free Software
23 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26 #include <urcu/uatomic.h>
27 #include <urcu/lfstack.h>
28 #include <urcu/waitqueue-lifo.h>
29 #include <urcu/wfcqueue.h>
30 #include <urcu/rculist.h>
31 #include <pthread.h>
32 #include <assert.h>
33
34 enum urcu_accept_ret {
35 URCU_ACCEPT_WORK = 0,
36 URCU_ACCEPT_SHUTDOWN = 1,
37 };
38
39 /*
40 * We use RCU to steal work from siblings. Therefore, one of RCU flavors
41 * need to be included before this header. All worker that participate
42 * in stealing (initialized with the URCU_WORKER_STEAL flag) need to be
43 * registered RCU readers threads.
44 */
45
46 struct urcu_work {
47 struct cds_wfcq_node node;
48 };
49
50 struct urcu_workqueue {
51 /* FIFO work queue */
52 struct __cds_wfcq_head head;
53 struct cds_wfcq_tail tail;
54
55 /* Associated wait queue for LIFO wait/wakeup */
56 struct urcu_wait_queue waitqueue;
57
58 /* RCU linked list head of siblings for work stealing. */
59 struct cds_list_head sibling_head;
60 pthread_mutex_t sibling_lock; /* Protect sibling list updates */
61
62 bool shutdown; /* Shutdown performed */
63 };
64
65 struct urcu_worker {
66 /* Workqueue which can be either used by worker, or stolen. */
67 struct cds_wfcq_head head;
68 struct cds_wfcq_tail tail;
69
70 /* Work belonging to worker. Cannot be stolen. */
71 struct urcu_work *own;
72
73 struct urcu_wait_node wait_node;
74 /* RCU linked list node of siblings for work stealing. */
75 struct cds_list_head sibling_node;
76 int flags; /* enum urcu_worker_flags */
77 };
78
79 enum urcu_worker_flags {
80 URCU_WORKER_STEAL = (1 << 0),
81 };
82
83 static inline
84 void urcu_workqueue_init(struct urcu_workqueue *queue)
85 {
86 __cds_wfcq_init(&queue->head, &queue->tail);
87 urcu_wait_queue_init(&queue->waitqueue);
88 CDS_INIT_LIST_HEAD(&queue->sibling_head);
89 queue->shutdown = false;
90 }
91
92 static inline
93 void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
94 {
95 bool was_empty;
96
97 cds_wfcq_node_init(&work->node);
98
99 /* Enqueue work. */
100 was_empty = !cds_wfcq_enqueue(&queue->head, &queue->tail,
101 &work->node);
102 /*
103 * If workqueue was previously empty, wakeup one worker thread.
104 * It will eventually grab the entire content of the work-queue
105 * (therefore grabbing a "work batch"). After having grabbed the
106 * work batch, while that thread is running and taking care of
107 * that work batch, when we enqueue more work, we will wake
108 * another thread (if there is one waiting), which will
109 * eventually grab the new batch, and so on. This scheme ensures
110 * that contiguous batch of work are handled by the same thread
111 * (for locality), and also ensures that we scale work to many
112 * worker threads when threads are busy enough to still be
113 * running when work is enqueued.
114 */
115 if (was_empty) {
116 rcu_read_lock(); /* Protect stack dequeue */
117 (void) urcu_dequeue_wake_single(&queue->waitqueue);
118 rcu_read_unlock(); /* Protect stack dequeue */
119 }
120 }
121
122 static inline
123 void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
124 {
125 struct urcu_waiters waiters;
126
127 rcu_read_lock(); /* Protect stack dequeue */
128 urcu_move_waiters(&waiters, &queue->waitqueue);
129 rcu_read_unlock(); /* Protect stack dequeue */
130
131 (void) urcu_wake_all_waiters(&waiters);
132 }
133
134 static inline
135 void urcu_worker_init(struct urcu_worker *worker, int flags)
136 {
137 cds_wfcq_init(&worker->head, &worker->tail);
138 worker->flags = flags;
139 urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
140 worker->own = NULL;
141 }
142
143 static inline
144 void urcu_worker_register(struct urcu_workqueue *queue,
145 struct urcu_worker *worker)
146 {
147 if (worker->flags & URCU_WORKER_STEAL) {
148 pthread_mutex_lock(&queue->sibling_lock);
149 cds_list_add_rcu(&worker->sibling_node, &queue->sibling_head);
150 pthread_mutex_unlock(&queue->sibling_lock);
151 }
152 }
153
154 static inline
155 void urcu_worker_unregister(struct urcu_workqueue *queue,
156 struct urcu_worker *worker)
157 {
158 enum cds_wfcq_ret wfcq_ret;
159
160 if (worker->flags & URCU_WORKER_STEAL) {
161 pthread_mutex_lock(&queue->sibling_lock);
162 cds_list_del_rcu(&worker->sibling_node);
163 pthread_mutex_unlock(&queue->sibling_lock);
164 }
165
166 /*
167 * Make sure we are removed from waitqueue.
168 */
169 if (CMM_LOAD_SHARED(worker->wait_node.node.next))
170 __urcu_workqueue_wakeup_all(queue);
171
172 /*
173 * Put any local work we still have back into the workqueue.
174 */
175 wfcq_ret = __cds_wfcq_splice_blocking(&queue->head,
176 &queue->tail,
177 &worker->head,
178 &worker->tail);
179 if (wfcq_ret != CDS_WFCQ_RET_SRC_EMPTY
180 && wfcq_ret == CDS_WFCQ_RET_DEST_EMPTY) {
181 /*
182 * Wakeup worker thread if we have put work back into
183 * workqueue that was previously empty.
184 */
185 rcu_read_lock(); /* Protect stack dequeue */
186 (void) urcu_dequeue_wake_single(&queue->waitqueue);
187 rcu_read_unlock(); /* Protect stack dequeue */
188 }
189
190 /*
191 * Wait for grace period before freeing or reusing
192 * "worker" because used by RCU linked list.
193 * Also prevents ABA for waitqueue stack dequeue: matches RCU
194 * read-side critical sections around dequeue and move all
195 * operations on waitqueue).
196 */
197 synchronize_rcu();
198 }
199
200 static inline
201 bool ___urcu_grab_work(struct urcu_worker *worker,
202 cds_wfcq_head_ptr_t src_head,
203 struct cds_wfcq_tail *src_tail,
204 bool steal)
205 {
206 enum cds_wfcq_ret splice_ret;
207 struct cds_wfcq_head tmp_head;
208 struct cds_wfcq_tail tmp_tail;
209 struct cds_wfcq_node *node;
210
211 /*
212 * Don't bother grabbing the src queue lock if it is empty.
213 */
214 if (cds_wfcq_empty(src_head, src_tail))
215 return false;
216 cds_wfcq_init(&tmp_head, &tmp_tail);
217
218 /* Ensure that we preserve FIFO work order. */
219 assert(!steal || worker->own == NULL);
220
221 /* Splice to temporary queue. */
222 if (steal)
223 cds_wfcq_dequeue_lock(src_head.h, src_tail);
224 splice_ret = __cds_wfcq_splice_blocking(&tmp_head,
225 &tmp_tail,
226 src_head,
227 src_tail);
228 if (steal)
229 cds_wfcq_dequeue_unlock(src_head.h, src_tail);
230 if (splice_ret == CDS_WFCQ_RET_SRC_EMPTY)
231 return false;
232
233 /*
234 * Keep one work entry for ourself. This ensures forward
235 * progress amongst stealing co-workers. This also ensures that
236 * when a worker grab some work from the global workqueue, it
237 * will have at least one work item to deal with.
238 */
239 if (worker->own == NULL) {
240 if (!steal) {
241 /*
242 * Try to grab own work from worker workqueue to
243 * preserve FIFO order.
244 */
245 node = cds_wfcq_dequeue_blocking(&worker->head,
246 &worker->tail);
247 if (node)
248 goto got_node;
249 }
250 node = __cds_wfcq_dequeue_blocking(&tmp_head, &tmp_tail);
251 assert(node != NULL);
252 got_node:
253 worker->own = caa_container_of(node, struct urcu_work, node);
254 }
255
256 /* Splice into worker workqueue. */
257 splice_ret = cds_wfcq_splice_blocking(&worker->head,
258 &worker->tail,
259 &tmp_head,
260 &tmp_tail);
261 /* Ensure that we preserve FIFO work order. */
262 assert(!steal || splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
263 return true;
264 }
265
266 /*
267 * Try stealing work from siblings when we have nothing to do.
268 */
269 static inline
270 bool ___urcu_steal_work(struct urcu_worker *worker,
271 struct urcu_worker *sibling)
272 {
273 return ___urcu_grab_work(worker, &sibling->head, &sibling->tail, 1);
274 }
275
276 static inline
277 bool __urcu_steal_work(struct urcu_workqueue *queue,
278 struct urcu_worker *worker)
279 {
280 struct urcu_worker *sibling_prev, *sibling_next;
281 struct cds_list_head *sibling_node;
282 bool steal_performed = 0;
283
284 if (!(worker->flags & URCU_WORKER_STEAL))
285 return false;
286
287 rcu_read_lock();
288
289 sibling_node = rcu_dereference(worker->sibling_node.next);
290 if (sibling_node == &queue->sibling_head)
291 sibling_node = rcu_dereference(sibling_node->next);
292 sibling_next = caa_container_of(sibling_node, struct urcu_worker,
293 sibling_node);
294 if (sibling_next != worker)
295 steal_performed = ___urcu_steal_work(worker, sibling_next);
296 if (steal_performed)
297 goto end;
298
299 sibling_node = rcu_dereference(worker->sibling_node.prev);
300 if (sibling_node == &queue->sibling_head)
301 sibling_node = rcu_dereference(sibling_node->prev);
302 sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
303 sibling_node);
304 if (sibling_prev != worker && sibling_prev != sibling_next)
305 steal_performed = ___urcu_steal_work(worker, sibling_prev);
306 end:
307 rcu_read_unlock();
308
309 return steal_performed;
310 }
311
312 static inline
313 bool ___urcu_wakeup_sibling(struct urcu_worker *sibling)
314 {
315 return urcu_adaptative_wake_up(&sibling->wait_node);
316 }
317
318 static inline
319 bool __urcu_wakeup_siblings(struct urcu_workqueue *queue,
320 struct urcu_worker *worker)
321 {
322 struct urcu_worker *sibling_prev, *sibling_next;
323 struct cds_list_head *sibling_node;
324 bool wakeup_performed = 0;
325
326 if (!(worker->flags & URCU_WORKER_STEAL))
327 return;
328
329 /* Only wakeup siblings if we have work in our own queue. */
330 if (cds_wfcq_empty(&worker->head, &worker->tail))
331 return;
332
333 rcu_read_lock();
334
335 sibling_node = rcu_dereference(worker->sibling_node.next);
336 if (sibling_node == &queue->sibling_head)
337 sibling_node = rcu_dereference(sibling_node->next);
338 sibling_next = caa_container_of(sibling_node, struct urcu_worker,
339 sibling_node);
340 if (sibling_next != worker)
341 wakeup_performed = ___urcu_wakeup_sibling(sibling_next);
342 if (wakeup_performed)
343 goto end;
344
345 sibling_node = rcu_dereference(worker->sibling_node.prev);
346 if (sibling_node == &queue->sibling_head)
347 sibling_node = rcu_dereference(sibling_node->prev);
348 sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
349 sibling_node);
350 if (sibling_prev != worker && sibling_prev != sibling_next)
351 wakeup_performed = ___urcu_wakeup_sibling(sibling_prev);
352 end:
353 rcu_read_unlock();
354
355 return wakeup_performed;
356 }
357
358 static inline
359 enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue,
360 struct urcu_worker *worker)
361 {
362 enum cds_wfcq_ret wfcq_ret;
363 bool has_work;
364
365 has_work = ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
366 /* Don't wait if we have work to do. */
367 if (has_work || !cds_wfcq_empty(&worker->head, &worker->tail))
368 goto do_work;
369 /* Try to steal work from sibling instead of blocking */
370 if (__urcu_steal_work(queue, worker))
371 goto do_work;
372 /* No more work to do, check shutdown state */
373 if (CMM_LOAD_SHARED(queue->shutdown))
374 return URCU_ACCEPT_SHUTDOWN;
375 urcu_wait_set_state(&worker->wait_node,
376 URCU_WAIT_WAITING);
377 if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) {
378 int was_empty;
379
380 /*
381 * NULL next pointer. We are therefore not in
382 * the queue.
383 */
384 cds_lfs_node_init(&worker->wait_node.node);
385 /* Protect stack dequeue against ABA */
386 synchronize_rcu();
387 was_empty = !urcu_wait_add(&queue->waitqueue,
388 &worker->wait_node);
389 /*
390 * If the wait queue was empty, it means we are the
391 * first thread to be put back into an otherwise empty
392 * wait queue. Re-check if work queue is empty after
393 * adding ourself to wait queue, so we can wakeup the
394 * top of wait queue since new work have appeared, and
395 * work enqueuer may not have seen that it needed to do
396 * a wake up.
397 */
398 if (was_empty && !cds_wfcq_empty(&queue->head,
399 &queue->tail)) {
400 rcu_read_lock(); /* Protect stack dequeue */
401 (void) urcu_dequeue_wake_single(&queue->waitqueue);
402 rcu_read_unlock(); /* Protect stack dequeue */
403 }
404 } else {
405 /*
406 * Non-NULL next pointer. We are therefore in
407 * the queue, or the dispatcher just removed us
408 * from it (after we read the next pointer), and
409 * is therefore awakening us. The state will
410 * therefore have been changed from WAITING to
411 * some other state, which will let the busy
412 * wait pass through.
413 */
414 }
415 urcu_adaptative_busy_wait(&worker->wait_node);
416 return;
417
418 do_work:
419 /*
420 * We will be busy handling the work batch, awaken siblings so
421 * they can steal from us.
422 */
423 (void) __urcu_wakeup_siblings(queue, worker);
424 return URCU_ACCEPT_WORK;
425 }
426
427 static inline
428 struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
429 {
430 struct cds_wfcq_node *node;
431
432 if (worker->own) {
433 struct urcu_work *work;
434
435 /* Process our own work entry. */
436 work = worker->own;
437 worker->own = NULL;
438 return work;
439 }
440 /*
441 * If we are registered for work stealing, we need to dequeue
442 * safely against siblings.
443 */
444 if (worker->flags & URCU_WORKER_STEAL) {
445 /*
446 * Don't bother grabbing the worker queue lock if it is
447 * empty.
448 */
449 if (cds_wfcq_empty(&worker->head, &worker->tail))
450 return NULL;
451 node = cds_wfcq_dequeue_blocking(&worker->head,
452 &worker->tail);
453 } else {
454 node = ___cds_wfcq_dequeue_with_state(&worker->head,
455 &worker->tail, NULL, 1, 0);
456 }
457 if (!node)
458 return NULL;
459 return caa_container_of(node, struct urcu_work, node);
460 }
461
462 static inline
463 void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
464 {
465 /* Set shutdown */
466 CMM_STORE_SHARED(queue->shutdown, true);
467 /* Wakeup all workers */
468 __urcu_workqueue_wakeup_all(queue);
469 }
470
471 #endif /* _URCU_WORKQUEUE_FIFO_H */
This page took 0.036875 seconds and 3 git commands to generate.