waitqueue: add in_waitqueue field
[userspace-rcu.git] / urcu / workqueue-fifo.h
CommitLineData
13652c4b
MD
1#ifndef _URCU_WORKQUEUE_FIFO_H
2#define _URCU_WORKQUEUE_FIFO_H
3
4/*
5 * urcu/workqueue-fifo.h
6 *
7 * Userspace RCU library - work queue scheme with FIFO semantic
8 *
9 * Copyright (c) 2014 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 *
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License as published by the Free Software Foundation; either
14 * version 2.1 of the License, or (at your option) any later version.
15 *
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public
22 * License along with this library; if not, write to the Free Software
23 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26#include <urcu/uatomic.h>
7a618cf7 27#include <urcu/lfstack.h>
13652c4b
MD
28#include <urcu/waitqueue-lifo.h>
29#include <urcu/wfcqueue.h>
30#include <urcu/rculist.h>
31#include <pthread.h>
e10c65b3 32#include <assert.h>
13652c4b 33
1b0a9891
MD
34enum urcu_accept_ret {
35 URCU_ACCEPT_WORK = 0,
36 URCU_ACCEPT_SHUTDOWN = 1,
37};
38
8a2c74fe
MD
39enum urcu_enqueue_ret {
40 URCU_ENQUEUE_OK = 0,
41 URCU_ENQUEUE_FULL = 1,
42};
43
13652c4b
MD
44/*
45 * We use RCU to steal work from siblings. Therefore, one of RCU flavors
46 * need to be included before this header. All worker that participate
47 * in stealing (initialized with the URCU_WORKER_STEAL flag) need to be
48 * registered RCU readers threads.
49 */
50
51struct urcu_work {
52 struct cds_wfcq_node node;
53};
54
55struct urcu_workqueue {
56 /* FIFO work queue */
57 struct __cds_wfcq_head head;
58 struct cds_wfcq_tail tail;
59
60 /* Associated wait queue for LIFO wait/wakeup */
61 struct urcu_wait_queue waitqueue;
62
63 /* RCU linked list head of siblings for work stealing. */
64 struct cds_list_head sibling_head;
65 pthread_mutex_t sibling_lock; /* Protect sibling list updates */
1b0a9891 66
8a2c74fe
MD
67 /* Maximum number of work entries (approximate). 0 means infinite. */
68 unsigned long nr_work_max;
69 unsigned long nr_work; /* Current number of work items */
2f02be5c
MD
70
71 int worker_flags; /* Worker flags */
b19099fe 72 int shutdown; /* Shutdown performed */
13652c4b
MD
73};
74
75struct urcu_worker {
a6492159 76 /* Workqueue which can be either used by worker, or stolen. */
13652c4b
MD
77 struct cds_wfcq_head head;
78 struct cds_wfcq_tail tail;
79
a6492159
MD
80 /* Work belonging to worker. Cannot be stolen. */
81 struct urcu_work *own;
82
13652c4b
MD
83 struct urcu_wait_node wait_node;
84 /* RCU linked list node of siblings for work stealing. */
85 struct cds_list_head sibling_node;
8a2c74fe 86 struct urcu_workqueue *queue;
13652c4b
MD
87 int flags; /* enum urcu_worker_flags */
88};
89
90enum urcu_worker_flags {
91 URCU_WORKER_STEAL = (1 << 0),
92};
93
94static inline
8a2c74fe 95void urcu_workqueue_init(struct urcu_workqueue *queue,
2f02be5c
MD
96 unsigned long max_queue_len,
97 int worker_flags)
13652c4b
MD
98{
99 __cds_wfcq_init(&queue->head, &queue->tail);
100 urcu_wait_queue_init(&queue->waitqueue);
101 CDS_INIT_LIST_HEAD(&queue->sibling_head);
8313fa62 102 pthread_mutex_init(&queue->sibling_lock, NULL);
8a2c74fe
MD
103 queue->nr_work_max = max_queue_len;
104 queue->nr_work = 0;
1b0a9891 105 queue->shutdown = false;
13652c4b
MD
106}
107
108static inline
8a2c74fe
MD
109enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue,
110 struct urcu_work *work)
13652c4b
MD
111{
112 bool was_empty;
8a2c74fe
MD
113 unsigned long nr_work_max;
114
115 nr_work_max = queue->nr_work_max;
116 if (nr_work_max) {
117 /* Approximate max queue size. */
118 if (uatomic_read(&queue->nr_work) >= nr_work_max)
119 return URCU_ENQUEUE_FULL;
120 uatomic_inc(&queue->nr_work);
121 }
13652c4b
MD
122 cds_wfcq_node_init(&work->node);
123
124 /* Enqueue work. */
125 was_empty = !cds_wfcq_enqueue(&queue->head, &queue->tail,
126 &work->node);
127 /*
128 * If workqueue was previously empty, wakeup one worker thread.
129 * It will eventually grab the entire content of the work-queue
130 * (therefore grabbing a "work batch"). After having grabbed the
131 * work batch, while that thread is running and taking care of
132 * that work batch, when we enqueue more work, we will wake
133 * another thread (if there is one waiting), which will
134 * eventually grab the new batch, and so on. This scheme ensures
135 * that contiguous batch of work are handled by the same thread
136 * (for locality), and also ensures that we scale work to many
137 * worker threads when threads are busy enough to still be
138 * running when work is enqueued.
139 */
d3afe039
MD
140 if (was_empty) {
141 rcu_read_lock(); /* Protect stack dequeue */
13652c4b 142 (void) urcu_dequeue_wake_single(&queue->waitqueue);
d3afe039
MD
143 rcu_read_unlock(); /* Protect stack dequeue */
144 }
8a2c74fe 145 return URCU_ENQUEUE_OK;
13652c4b
MD
146}
147
148static inline
1b0a9891 149void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
13652c4b
MD
150{
151 struct urcu_waiters waiters;
152
d3afe039 153 rcu_read_lock(); /* Protect stack dequeue */
13652c4b 154 urcu_move_waiters(&waiters, &queue->waitqueue);
d3afe039
MD
155 rcu_read_unlock(); /* Protect stack dequeue */
156
13652c4b
MD
157 (void) urcu_wake_all_waiters(&waiters);
158}
159
160static inline
8a2c74fe 161void urcu_worker_init(struct urcu_workqueue *queue,
2f02be5c 162 struct urcu_worker *worker)
13652c4b
MD
163{
164 cds_wfcq_init(&worker->head, &worker->tail);
13652c4b 165 urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
a6492159 166 worker->own = NULL;
db9916c6 167 worker->wait_node.node.next = NULL;
8a2c74fe 168 worker->queue = queue;
2f02be5c 169 worker->flags = queue->worker_flags;
13652c4b
MD
170}
171
172static inline
173void urcu_worker_register(struct urcu_workqueue *queue,
174 struct urcu_worker *worker)
175{
176 if (worker->flags & URCU_WORKER_STEAL) {
177 pthread_mutex_lock(&queue->sibling_lock);
178 cds_list_add_rcu(&worker->sibling_node, &queue->sibling_head);
179 pthread_mutex_unlock(&queue->sibling_lock);
180 }
181}
182
183static inline
184void urcu_worker_unregister(struct urcu_workqueue *queue,
185 struct urcu_worker *worker)
186{
187 enum cds_wfcq_ret wfcq_ret;
188
189 if (worker->flags & URCU_WORKER_STEAL) {
190 pthread_mutex_lock(&queue->sibling_lock);
191 cds_list_del_rcu(&worker->sibling_node);
192 pthread_mutex_unlock(&queue->sibling_lock);
13652c4b
MD
193 }
194
d3afe039 195 /*
6e17009c 196 * Make sure we are removed from waitqueue.
d3afe039 197 */
aa46e09f 198 if (urcu_in_waitqueue(&worker->wait_node))
6e17009c 199 __urcu_workqueue_wakeup_all(queue);
d3afe039 200
13652c4b
MD
201 /*
202 * Put any local work we still have back into the workqueue.
203 */
204 wfcq_ret = __cds_wfcq_splice_blocking(&queue->head,
205 &queue->tail,
206 &worker->head,
207 &worker->tail);
208 if (wfcq_ret != CDS_WFCQ_RET_SRC_EMPTY
209 && wfcq_ret == CDS_WFCQ_RET_DEST_EMPTY) {
210 /*
211 * Wakeup worker thread if we have put work back into
212 * workqueue that was previously empty.
213 */
d3afe039 214 rcu_read_lock(); /* Protect stack dequeue */
13652c4b 215 (void) urcu_dequeue_wake_single(&queue->waitqueue);
d3afe039 216 rcu_read_unlock(); /* Protect stack dequeue */
13652c4b 217 }
6e17009c
MD
218
219 /*
220 * Wait for grace period before freeing or reusing
221 * "worker" because used by RCU linked list.
222 * Also prevents ABA for waitqueue stack dequeue: matches RCU
223 * read-side critical sections around dequeue and move all
224 * operations on waitqueue).
225 */
226 synchronize_rcu();
13652c4b
MD
227}
228
13652c4b 229static inline
a6492159
MD
230bool ___urcu_grab_work(struct urcu_worker *worker,
231 cds_wfcq_head_ptr_t src_head,
232 struct cds_wfcq_tail *src_tail,
233 bool steal)
13652c4b 234{
e10c65b3 235 enum cds_wfcq_ret splice_ret;
8313fa62 236 struct __cds_wfcq_head tmp_head;
a6492159
MD
237 struct cds_wfcq_tail tmp_tail;
238 struct cds_wfcq_node *node;
e10c65b3 239
30926570 240 /*
a6492159 241 * Don't bother grabbing the src queue lock if it is empty.
30926570 242 */
a6492159 243 if (cds_wfcq_empty(src_head, src_tail))
e10c65b3 244 return false;
8313fa62 245 __cds_wfcq_init(&tmp_head, &tmp_tail);
a6492159
MD
246
247 /* Ensure that we preserve FIFO work order. */
248 assert(!steal || worker->own == NULL);
249
250 /* Splice to temporary queue. */
251 if (steal)
252 cds_wfcq_dequeue_lock(src_head.h, src_tail);
253 splice_ret = __cds_wfcq_splice_blocking(&tmp_head,
254 &tmp_tail,
255 src_head,
256 src_tail);
257 if (steal)
258 cds_wfcq_dequeue_unlock(src_head.h, src_tail);
259 if (splice_ret == CDS_WFCQ_RET_SRC_EMPTY)
260 return false;
261
262 /*
263 * Keep one work entry for ourself. This ensures forward
264 * progress amongst stealing co-workers. This also ensures that
265 * when a worker grab some work from the global workqueue, it
266 * will have at least one work item to deal with.
267 */
268 if (worker->own == NULL) {
269 if (!steal) {
270 /*
271 * Try to grab own work from worker workqueue to
272 * preserve FIFO order.
273 */
274 node = cds_wfcq_dequeue_blocking(&worker->head,
275 &worker->tail);
276 if (node)
277 goto got_node;
278 }
279 node = __cds_wfcq_dequeue_blocking(&tmp_head, &tmp_tail);
280 assert(node != NULL);
281got_node:
282 worker->own = caa_container_of(node, struct urcu_work, node);
283 }
284
285 /* Splice into worker workqueue. */
8313fa62 286 splice_ret = __cds_wfcq_splice_blocking(&worker->head,
13652c4b 287 &worker->tail,
a6492159
MD
288 &tmp_head,
289 &tmp_tail);
e10c65b3 290 /* Ensure that we preserve FIFO work order. */
a6492159
MD
291 assert(!steal || splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
292 return true;
293}
294
295/*
296 * Try stealing work from siblings when we have nothing to do.
297 */
298static inline
299bool ___urcu_steal_work(struct urcu_worker *worker,
300 struct urcu_worker *sibling)
301{
302 return ___urcu_grab_work(worker, &sibling->head, &sibling->tail, 1);
13652c4b
MD
303}
304
305static inline
e10c65b3 306bool __urcu_steal_work(struct urcu_workqueue *queue,
13652c4b
MD
307 struct urcu_worker *worker)
308{
309 struct urcu_worker *sibling_prev, *sibling_next;
310 struct cds_list_head *sibling_node;
e10c65b3 311 bool steal_performed = 0;
13652c4b
MD
312
313 if (!(worker->flags & URCU_WORKER_STEAL))
e10c65b3 314 return false;
13652c4b
MD
315
316 rcu_read_lock();
317
318 sibling_node = rcu_dereference(worker->sibling_node.next);
319 if (sibling_node == &queue->sibling_head)
320 sibling_node = rcu_dereference(sibling_node->next);
321 sibling_next = caa_container_of(sibling_node, struct urcu_worker,
322 sibling_node);
323 if (sibling_next != worker)
e10c65b3
MD
324 steal_performed = ___urcu_steal_work(worker, sibling_next);
325 if (steal_performed)
326 goto end;
13652c4b
MD
327
328 sibling_node = rcu_dereference(worker->sibling_node.prev);
329 if (sibling_node == &queue->sibling_head)
330 sibling_node = rcu_dereference(sibling_node->prev);
331 sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
332 sibling_node);
333 if (sibling_prev != worker && sibling_prev != sibling_next)
e10c65b3
MD
334 steal_performed = ___urcu_steal_work(worker, sibling_prev);
335end:
13652c4b
MD
336 rcu_read_unlock();
337
e10c65b3 338 return steal_performed;
13652c4b
MD
339}
340
341static inline
5d30bf32 342bool ___urcu_wakeup_sibling(struct urcu_worker *sibling)
13652c4b 343{
5d30bf32 344 return urcu_adaptative_wake_up(&sibling->wait_node);
13652c4b
MD
345}
346
347static inline
5d30bf32 348bool __urcu_wakeup_siblings(struct urcu_workqueue *queue,
13652c4b
MD
349 struct urcu_worker *worker)
350{
351 struct urcu_worker *sibling_prev, *sibling_next;
352 struct cds_list_head *sibling_node;
5d30bf32 353 bool wakeup_performed = 0;
13652c4b
MD
354
355 if (!(worker->flags & URCU_WORKER_STEAL))
356 return;
357
358 /* Only wakeup siblings if we have work in our own queue. */
359 if (cds_wfcq_empty(&worker->head, &worker->tail))
360 return;
361
362 rcu_read_lock();
363
364 sibling_node = rcu_dereference(worker->sibling_node.next);
365 if (sibling_node == &queue->sibling_head)
366 sibling_node = rcu_dereference(sibling_node->next);
367 sibling_next = caa_container_of(sibling_node, struct urcu_worker,
368 sibling_node);
369 if (sibling_next != worker)
5d30bf32
MD
370 wakeup_performed = ___urcu_wakeup_sibling(sibling_next);
371 if (wakeup_performed)
372 goto end;
13652c4b
MD
373
374 sibling_node = rcu_dereference(worker->sibling_node.prev);
375 if (sibling_node == &queue->sibling_head)
376 sibling_node = rcu_dereference(sibling_node->prev);
377 sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
378 sibling_node);
379 if (sibling_prev != worker && sibling_prev != sibling_next)
5d30bf32
MD
380 wakeup_performed = ___urcu_wakeup_sibling(sibling_prev);
381end:
13652c4b 382 rcu_read_unlock();
5d30bf32
MD
383
384 return wakeup_performed;
13652c4b
MD
385}
386
387static inline
8a2c74fe 388enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
13652c4b 389{
8a2c74fe 390 struct urcu_workqueue *queue = worker->queue;
13652c4b 391 enum cds_wfcq_ret wfcq_ret;
a6492159 392 bool has_work;
13652c4b 393
a6492159 394 has_work = ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
13652c4b 395 /* Don't wait if we have work to do. */
0a14cd14
MD
396 if (has_work || worker->own
397 || !cds_wfcq_empty(&worker->head, &worker->tail))
13652c4b
MD
398 goto do_work;
399 /* Try to steal work from sibling instead of blocking */
400 if (__urcu_steal_work(queue, worker))
401 goto do_work;
1b0a9891
MD
402 /* No more work to do, check shutdown state */
403 if (CMM_LOAD_SHARED(queue->shutdown))
404 return URCU_ACCEPT_SHUTDOWN;
13652c4b
MD
405 urcu_wait_set_state(&worker->wait_node,
406 URCU_WAIT_WAITING);
aa46e09f 407 if (!urcu_in_waitqueue(&worker->wait_node)) {
13652c4b
MD
408 int was_empty;
409
d3afe039
MD
410 /* Protect stack dequeue against ABA */
411 synchronize_rcu();
13652c4b
MD
412 was_empty = !urcu_wait_add(&queue->waitqueue,
413 &worker->wait_node);
414 /*
415 * If the wait queue was empty, it means we are the
416 * first thread to be put back into an otherwise empty
417 * wait queue. Re-check if work queue is empty after
418 * adding ourself to wait queue, so we can wakeup the
419 * top of wait queue since new work have appeared, and
420 * work enqueuer may not have seen that it needed to do
421 * a wake up.
422 */
423 if (was_empty && !cds_wfcq_empty(&queue->head,
d3afe039
MD
424 &queue->tail)) {
425 rcu_read_lock(); /* Protect stack dequeue */
13652c4b 426 (void) urcu_dequeue_wake_single(&queue->waitqueue);
d3afe039
MD
427 rcu_read_unlock(); /* Protect stack dequeue */
428 }
13652c4b
MD
429 } else {
430 /*
aa46e09f
MD
431 * We are in the queue, or the dispatcher just removed
432 * us from it (after we read the next pointer), and is
433 * therefore awakening us. The state will therefore have
434 * been changed from WAITING to some other state, which
435 * will let the busy wait pass through.
13652c4b
MD
436 */
437 }
438 urcu_adaptative_busy_wait(&worker->wait_node);
439 return;
440
441do_work:
442 /*
443 * We will be busy handling the work batch, awaken siblings so
444 * they can steal from us.
445 */
5d30bf32 446 (void) __urcu_wakeup_siblings(queue, worker);
1b0a9891 447 return URCU_ACCEPT_WORK;
13652c4b
MD
448}
449
450static inline
451struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
452{
8a2c74fe 453 struct urcu_workqueue *queue = worker->queue;
13652c4b 454 struct cds_wfcq_node *node;
8a2c74fe 455 struct urcu_work *work;
13652c4b 456
a6492159 457 if (worker->own) {
a6492159
MD
458 /* Process our own work entry. */
459 work = worker->own;
460 worker->own = NULL;
8a2c74fe 461 goto end;
a6492159 462 }
13652c4b
MD
463 /*
464 * If we are registered for work stealing, we need to dequeue
465 * safely against siblings.
466 */
30926570
MD
467 if (worker->flags & URCU_WORKER_STEAL) {
468 /*
469 * Don't bother grabbing the worker queue lock if it is
470 * empty.
471 */
472 if (cds_wfcq_empty(&worker->head, &worker->tail))
473 return NULL;
13652c4b
MD
474 node = cds_wfcq_dequeue_blocking(&worker->head,
475 &worker->tail);
30926570 476 } else {
13652c4b
MD
477 node = ___cds_wfcq_dequeue_with_state(&worker->head,
478 &worker->tail, NULL, 1, 0);
30926570 479 }
13652c4b
MD
480 if (!node)
481 return NULL;
8a2c74fe
MD
482 work = caa_container_of(node, struct urcu_work, node);
483end:
484 if (queue->nr_work_max)
485 uatomic_dec(&queue->nr_work);
486 return work;
13652c4b
MD
487}
488
1b0a9891
MD
489static inline
490void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
491{
492 /* Set shutdown */
493 CMM_STORE_SHARED(queue->shutdown, true);
494 /* Wakeup all workers */
495 __urcu_workqueue_wakeup_all(queue);
496}
497
f00ae188
MD
498/*
499 * Use to let dispatcher steal work from the entire queue in case of
500 * stall. The "worker" parameter need to be intialized, but is usually
501 * not registered.
502 */
503static inline
504bool urcu_workqueue_steal_all(struct urcu_workqueue *queue,
505 struct urcu_worker *worker)
506{
507 struct urcu_worker *sibling;
508 bool has_work = false;
509
2e1ced1f
MD
510 if (worker->flags & URCU_WORKER_STEAL) {
511 rcu_read_lock();
512 /* Steal from each worker */
513 cds_list_for_each_entry_rcu(sibling, &queue->sibling_head,
514 sibling_node)
515 has_work |= ___urcu_grab_work(worker, &sibling->head,
516 &sibling->tail, 1);
517 rcu_read_unlock();
518 }
f00ae188
MD
519
520 /* Steal from global workqueue */
521 has_work |= ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
522 return has_work;
523}
524
13652c4b 525#endif /* _URCU_WORKQUEUE_FIFO_H */
This page took 0.047519 seconds and 4 git commands to generate.