workqueue: add approximate upper bound to queue length
[userspace-rcu.git] / urcu / workqueue-fifo.h
CommitLineData
13652c4b
MD
1#ifndef _URCU_WORKQUEUE_FIFO_H
2#define _URCU_WORKQUEUE_FIFO_H
3
4/*
5 * urcu/workqueue-fifo.h
6 *
7 * Userspace RCU library - work queue scheme with FIFO semantic
8 *
9 * Copyright (c) 2014 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 *
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License as published by the Free Software Foundation; either
14 * version 2.1 of the License, or (at your option) any later version.
15 *
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public
22 * License along with this library; if not, write to the Free Software
23 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26#include <urcu/uatomic.h>
7a618cf7 27#include <urcu/lfstack.h>
13652c4b
MD
28#include <urcu/waitqueue-lifo.h>
29#include <urcu/wfcqueue.h>
30#include <urcu/rculist.h>
31#include <pthread.h>
e10c65b3 32#include <assert.h>
13652c4b 33
1b0a9891
MD
34enum urcu_accept_ret {
35 URCU_ACCEPT_WORK = 0,
36 URCU_ACCEPT_SHUTDOWN = 1,
37};
38
8a2c74fe
MD
39enum urcu_enqueue_ret {
40 URCU_ENQUEUE_OK = 0,
41 URCU_ENQUEUE_FULL = 1,
42};
43
13652c4b
MD
44/*
45 * We use RCU to steal work from siblings. Therefore, one of RCU flavors
46 * need to be included before this header. All worker that participate
47 * in stealing (initialized with the URCU_WORKER_STEAL flag) need to be
48 * registered RCU readers threads.
49 */
50
51struct urcu_work {
52 struct cds_wfcq_node node;
53};
54
55struct urcu_workqueue {
56 /* FIFO work queue */
57 struct __cds_wfcq_head head;
58 struct cds_wfcq_tail tail;
59
60 /* Associated wait queue for LIFO wait/wakeup */
61 struct urcu_wait_queue waitqueue;
62
63 /* RCU linked list head of siblings for work stealing. */
64 struct cds_list_head sibling_head;
65 pthread_mutex_t sibling_lock; /* Protect sibling list updates */
1b0a9891 66
8a2c74fe
MD
67 /* Maximum number of work entries (approximate). 0 means infinite. */
68 unsigned long nr_work_max;
69 unsigned long nr_work; /* Current number of work items */
1b0a9891 70 bool shutdown; /* Shutdown performed */
13652c4b
MD
71};
72
73struct urcu_worker {
a6492159 74 /* Workqueue which can be either used by worker, or stolen. */
13652c4b
MD
75 struct cds_wfcq_head head;
76 struct cds_wfcq_tail tail;
77
a6492159
MD
78 /* Work belonging to worker. Cannot be stolen. */
79 struct urcu_work *own;
80
13652c4b
MD
81 struct urcu_wait_node wait_node;
82 /* RCU linked list node of siblings for work stealing. */
83 struct cds_list_head sibling_node;
8a2c74fe 84 struct urcu_workqueue *queue;
13652c4b
MD
85 int flags; /* enum urcu_worker_flags */
86};
87
88enum urcu_worker_flags {
89 URCU_WORKER_STEAL = (1 << 0),
90};
91
92static inline
8a2c74fe
MD
93void urcu_workqueue_init(struct urcu_workqueue *queue,
94 unsigned long max_queue_len)
13652c4b
MD
95{
96 __cds_wfcq_init(&queue->head, &queue->tail);
97 urcu_wait_queue_init(&queue->waitqueue);
98 CDS_INIT_LIST_HEAD(&queue->sibling_head);
8313fa62 99 pthread_mutex_init(&queue->sibling_lock, NULL);
8a2c74fe
MD
100 queue->nr_work_max = max_queue_len;
101 queue->nr_work = 0;
1b0a9891 102 queue->shutdown = false;
13652c4b
MD
103}
104
105static inline
8a2c74fe
MD
106enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue,
107 struct urcu_work *work)
13652c4b
MD
108{
109 bool was_empty;
8a2c74fe
MD
110 unsigned long nr_work_max;
111
112 nr_work_max = queue->nr_work_max;
113 if (nr_work_max) {
114 /* Approximate max queue size. */
115 if (uatomic_read(&queue->nr_work) >= nr_work_max)
116 return URCU_ENQUEUE_FULL;
117 uatomic_inc(&queue->nr_work);
118 }
13652c4b
MD
119 cds_wfcq_node_init(&work->node);
120
121 /* Enqueue work. */
122 was_empty = !cds_wfcq_enqueue(&queue->head, &queue->tail,
123 &work->node);
124 /*
125 * If workqueue was previously empty, wakeup one worker thread.
126 * It will eventually grab the entire content of the work-queue
127 * (therefore grabbing a "work batch"). After having grabbed the
128 * work batch, while that thread is running and taking care of
129 * that work batch, when we enqueue more work, we will wake
130 * another thread (if there is one waiting), which will
131 * eventually grab the new batch, and so on. This scheme ensures
132 * that contiguous batch of work are handled by the same thread
133 * (for locality), and also ensures that we scale work to many
134 * worker threads when threads are busy enough to still be
135 * running when work is enqueued.
136 */
d3afe039
MD
137 if (was_empty) {
138 rcu_read_lock(); /* Protect stack dequeue */
13652c4b 139 (void) urcu_dequeue_wake_single(&queue->waitqueue);
d3afe039
MD
140 rcu_read_unlock(); /* Protect stack dequeue */
141 }
8a2c74fe 142 return URCU_ENQUEUE_OK;
13652c4b
MD
143}
144
145static inline
1b0a9891 146void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
13652c4b
MD
147{
148 struct urcu_waiters waiters;
149
d3afe039 150 rcu_read_lock(); /* Protect stack dequeue */
13652c4b 151 urcu_move_waiters(&waiters, &queue->waitqueue);
d3afe039
MD
152 rcu_read_unlock(); /* Protect stack dequeue */
153
13652c4b
MD
154 (void) urcu_wake_all_waiters(&waiters);
155}
156
157static inline
8a2c74fe
MD
158void urcu_worker_init(struct urcu_workqueue *queue,
159 struct urcu_worker *worker, int flags)
13652c4b
MD
160{
161 cds_wfcq_init(&worker->head, &worker->tail);
162 worker->flags = flags;
163 urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
a6492159 164 worker->own = NULL;
db9916c6 165 worker->wait_node.node.next = NULL;
8a2c74fe 166 worker->queue = queue;
13652c4b
MD
167}
168
169static inline
170void urcu_worker_register(struct urcu_workqueue *queue,
171 struct urcu_worker *worker)
172{
173 if (worker->flags & URCU_WORKER_STEAL) {
174 pthread_mutex_lock(&queue->sibling_lock);
175 cds_list_add_rcu(&worker->sibling_node, &queue->sibling_head);
176 pthread_mutex_unlock(&queue->sibling_lock);
177 }
178}
179
180static inline
181void urcu_worker_unregister(struct urcu_workqueue *queue,
182 struct urcu_worker *worker)
183{
184 enum cds_wfcq_ret wfcq_ret;
185
186 if (worker->flags & URCU_WORKER_STEAL) {
187 pthread_mutex_lock(&queue->sibling_lock);
188 cds_list_del_rcu(&worker->sibling_node);
189 pthread_mutex_unlock(&queue->sibling_lock);
13652c4b
MD
190 }
191
d3afe039 192 /*
6e17009c 193 * Make sure we are removed from waitqueue.
d3afe039 194 */
6e17009c
MD
195 if (CMM_LOAD_SHARED(worker->wait_node.node.next))
196 __urcu_workqueue_wakeup_all(queue);
d3afe039 197
13652c4b
MD
198 /*
199 * Put any local work we still have back into the workqueue.
200 */
201 wfcq_ret = __cds_wfcq_splice_blocking(&queue->head,
202 &queue->tail,
203 &worker->head,
204 &worker->tail);
205 if (wfcq_ret != CDS_WFCQ_RET_SRC_EMPTY
206 && wfcq_ret == CDS_WFCQ_RET_DEST_EMPTY) {
207 /*
208 * Wakeup worker thread if we have put work back into
209 * workqueue that was previously empty.
210 */
d3afe039 211 rcu_read_lock(); /* Protect stack dequeue */
13652c4b 212 (void) urcu_dequeue_wake_single(&queue->waitqueue);
d3afe039 213 rcu_read_unlock(); /* Protect stack dequeue */
13652c4b 214 }
6e17009c
MD
215
216 /*
217 * Wait for grace period before freeing or reusing
218 * "worker" because used by RCU linked list.
219 * Also prevents ABA for waitqueue stack dequeue: matches RCU
220 * read-side critical sections around dequeue and move all
221 * operations on waitqueue).
222 */
223 synchronize_rcu();
13652c4b
MD
224}
225
13652c4b 226static inline
a6492159
MD
227bool ___urcu_grab_work(struct urcu_worker *worker,
228 cds_wfcq_head_ptr_t src_head,
229 struct cds_wfcq_tail *src_tail,
230 bool steal)
13652c4b 231{
e10c65b3 232 enum cds_wfcq_ret splice_ret;
8313fa62 233 struct __cds_wfcq_head tmp_head;
a6492159
MD
234 struct cds_wfcq_tail tmp_tail;
235 struct cds_wfcq_node *node;
e10c65b3 236
30926570 237 /*
a6492159 238 * Don't bother grabbing the src queue lock if it is empty.
30926570 239 */
a6492159 240 if (cds_wfcq_empty(src_head, src_tail))
e10c65b3 241 return false;
8313fa62 242 __cds_wfcq_init(&tmp_head, &tmp_tail);
a6492159
MD
243
244 /* Ensure that we preserve FIFO work order. */
245 assert(!steal || worker->own == NULL);
246
247 /* Splice to temporary queue. */
248 if (steal)
249 cds_wfcq_dequeue_lock(src_head.h, src_tail);
250 splice_ret = __cds_wfcq_splice_blocking(&tmp_head,
251 &tmp_tail,
252 src_head,
253 src_tail);
254 if (steal)
255 cds_wfcq_dequeue_unlock(src_head.h, src_tail);
256 if (splice_ret == CDS_WFCQ_RET_SRC_EMPTY)
257 return false;
258
259 /*
260 * Keep one work entry for ourself. This ensures forward
261 * progress amongst stealing co-workers. This also ensures that
262 * when a worker grab some work from the global workqueue, it
263 * will have at least one work item to deal with.
264 */
265 if (worker->own == NULL) {
266 if (!steal) {
267 /*
268 * Try to grab own work from worker workqueue to
269 * preserve FIFO order.
270 */
271 node = cds_wfcq_dequeue_blocking(&worker->head,
272 &worker->tail);
273 if (node)
274 goto got_node;
275 }
276 node = __cds_wfcq_dequeue_blocking(&tmp_head, &tmp_tail);
277 assert(node != NULL);
278got_node:
279 worker->own = caa_container_of(node, struct urcu_work, node);
280 }
281
282 /* Splice into worker workqueue. */
8313fa62 283 splice_ret = __cds_wfcq_splice_blocking(&worker->head,
13652c4b 284 &worker->tail,
a6492159
MD
285 &tmp_head,
286 &tmp_tail);
e10c65b3 287 /* Ensure that we preserve FIFO work order. */
a6492159
MD
288 assert(!steal || splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
289 return true;
290}
291
292/*
293 * Try stealing work from siblings when we have nothing to do.
294 */
295static inline
296bool ___urcu_steal_work(struct urcu_worker *worker,
297 struct urcu_worker *sibling)
298{
299 return ___urcu_grab_work(worker, &sibling->head, &sibling->tail, 1);
13652c4b
MD
300}
301
302static inline
e10c65b3 303bool __urcu_steal_work(struct urcu_workqueue *queue,
13652c4b
MD
304 struct urcu_worker *worker)
305{
306 struct urcu_worker *sibling_prev, *sibling_next;
307 struct cds_list_head *sibling_node;
e10c65b3 308 bool steal_performed = 0;
13652c4b
MD
309
310 if (!(worker->flags & URCU_WORKER_STEAL))
e10c65b3 311 return false;
13652c4b
MD
312
313 rcu_read_lock();
314
315 sibling_node = rcu_dereference(worker->sibling_node.next);
316 if (sibling_node == &queue->sibling_head)
317 sibling_node = rcu_dereference(sibling_node->next);
318 sibling_next = caa_container_of(sibling_node, struct urcu_worker,
319 sibling_node);
320 if (sibling_next != worker)
e10c65b3
MD
321 steal_performed = ___urcu_steal_work(worker, sibling_next);
322 if (steal_performed)
323 goto end;
13652c4b
MD
324
325 sibling_node = rcu_dereference(worker->sibling_node.prev);
326 if (sibling_node == &queue->sibling_head)
327 sibling_node = rcu_dereference(sibling_node->prev);
328 sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
329 sibling_node);
330 if (sibling_prev != worker && sibling_prev != sibling_next)
e10c65b3
MD
331 steal_performed = ___urcu_steal_work(worker, sibling_prev);
332end:
13652c4b
MD
333 rcu_read_unlock();
334
e10c65b3 335 return steal_performed;
13652c4b
MD
336}
337
338static inline
5d30bf32 339bool ___urcu_wakeup_sibling(struct urcu_worker *sibling)
13652c4b 340{
5d30bf32 341 return urcu_adaptative_wake_up(&sibling->wait_node);
13652c4b
MD
342}
343
344static inline
5d30bf32 345bool __urcu_wakeup_siblings(struct urcu_workqueue *queue,
13652c4b
MD
346 struct urcu_worker *worker)
347{
348 struct urcu_worker *sibling_prev, *sibling_next;
349 struct cds_list_head *sibling_node;
5d30bf32 350 bool wakeup_performed = 0;
13652c4b
MD
351
352 if (!(worker->flags & URCU_WORKER_STEAL))
353 return;
354
355 /* Only wakeup siblings if we have work in our own queue. */
356 if (cds_wfcq_empty(&worker->head, &worker->tail))
357 return;
358
359 rcu_read_lock();
360
361 sibling_node = rcu_dereference(worker->sibling_node.next);
362 if (sibling_node == &queue->sibling_head)
363 sibling_node = rcu_dereference(sibling_node->next);
364 sibling_next = caa_container_of(sibling_node, struct urcu_worker,
365 sibling_node);
366 if (sibling_next != worker)
5d30bf32
MD
367 wakeup_performed = ___urcu_wakeup_sibling(sibling_next);
368 if (wakeup_performed)
369 goto end;
13652c4b
MD
370
371 sibling_node = rcu_dereference(worker->sibling_node.prev);
372 if (sibling_node == &queue->sibling_head)
373 sibling_node = rcu_dereference(sibling_node->prev);
374 sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
375 sibling_node);
376 if (sibling_prev != worker && sibling_prev != sibling_next)
5d30bf32
MD
377 wakeup_performed = ___urcu_wakeup_sibling(sibling_prev);
378end:
13652c4b 379 rcu_read_unlock();
5d30bf32
MD
380
381 return wakeup_performed;
13652c4b
MD
382}
383
384static inline
8a2c74fe 385enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
13652c4b 386{
8a2c74fe 387 struct urcu_workqueue *queue = worker->queue;
13652c4b 388 enum cds_wfcq_ret wfcq_ret;
a6492159 389 bool has_work;
13652c4b 390
a6492159 391 has_work = ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
13652c4b 392 /* Don't wait if we have work to do. */
0a14cd14
MD
393 if (has_work || worker->own
394 || !cds_wfcq_empty(&worker->head, &worker->tail))
13652c4b
MD
395 goto do_work;
396 /* Try to steal work from sibling instead of blocking */
397 if (__urcu_steal_work(queue, worker))
398 goto do_work;
1b0a9891
MD
399 /* No more work to do, check shutdown state */
400 if (CMM_LOAD_SHARED(queue->shutdown))
401 return URCU_ACCEPT_SHUTDOWN;
13652c4b
MD
402 urcu_wait_set_state(&worker->wait_node,
403 URCU_WAIT_WAITING);
404 if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) {
405 int was_empty;
406
407 /*
408 * NULL next pointer. We are therefore not in
409 * the queue.
410 */
7a618cf7 411 cds_lfs_node_init(&worker->wait_node.node);
d3afe039
MD
412 /* Protect stack dequeue against ABA */
413 synchronize_rcu();
13652c4b
MD
414 was_empty = !urcu_wait_add(&queue->waitqueue,
415 &worker->wait_node);
416 /*
417 * If the wait queue was empty, it means we are the
418 * first thread to be put back into an otherwise empty
419 * wait queue. Re-check if work queue is empty after
420 * adding ourself to wait queue, so we can wakeup the
421 * top of wait queue since new work have appeared, and
422 * work enqueuer may not have seen that it needed to do
423 * a wake up.
424 */
425 if (was_empty && !cds_wfcq_empty(&queue->head,
d3afe039
MD
426 &queue->tail)) {
427 rcu_read_lock(); /* Protect stack dequeue */
13652c4b 428 (void) urcu_dequeue_wake_single(&queue->waitqueue);
d3afe039
MD
429 rcu_read_unlock(); /* Protect stack dequeue */
430 }
13652c4b
MD
431 } else {
432 /*
433 * Non-NULL next pointer. We are therefore in
434 * the queue, or the dispatcher just removed us
435 * from it (after we read the next pointer), and
436 * is therefore awakening us. The state will
437 * therefore have been changed from WAITING to
438 * some other state, which will let the busy
439 * wait pass through.
440 */
441 }
442 urcu_adaptative_busy_wait(&worker->wait_node);
443 return;
444
445do_work:
446 /*
447 * We will be busy handling the work batch, awaken siblings so
448 * they can steal from us.
449 */
5d30bf32 450 (void) __urcu_wakeup_siblings(queue, worker);
1b0a9891 451 return URCU_ACCEPT_WORK;
13652c4b
MD
452}
453
454static inline
455struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
456{
8a2c74fe 457 struct urcu_workqueue *queue = worker->queue;
13652c4b 458 struct cds_wfcq_node *node;
8a2c74fe 459 struct urcu_work *work;
13652c4b 460
a6492159 461 if (worker->own) {
a6492159
MD
462 /* Process our own work entry. */
463 work = worker->own;
464 worker->own = NULL;
8a2c74fe 465 goto end;
a6492159 466 }
13652c4b
MD
467 /*
468 * If we are registered for work stealing, we need to dequeue
469 * safely against siblings.
470 */
30926570
MD
471 if (worker->flags & URCU_WORKER_STEAL) {
472 /*
473 * Don't bother grabbing the worker queue lock if it is
474 * empty.
475 */
476 if (cds_wfcq_empty(&worker->head, &worker->tail))
477 return NULL;
13652c4b
MD
478 node = cds_wfcq_dequeue_blocking(&worker->head,
479 &worker->tail);
30926570 480 } else {
13652c4b
MD
481 node = ___cds_wfcq_dequeue_with_state(&worker->head,
482 &worker->tail, NULL, 1, 0);
30926570 483 }
13652c4b
MD
484 if (!node)
485 return NULL;
8a2c74fe
MD
486 work = caa_container_of(node, struct urcu_work, node);
487end:
488 if (queue->nr_work_max)
489 uatomic_dec(&queue->nr_work);
490 return work;
13652c4b
MD
491}
492
1b0a9891
MD
493static inline
494void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
495{
496 /* Set shutdown */
497 CMM_STORE_SHARED(queue->shutdown, true);
498 /* Wakeup all workers */
499 __urcu_workqueue_wakeup_all(queue);
500}
501
13652c4b 502#endif /* _URCU_WORKQUEUE_FIFO_H */
This page took 0.0455140000000001 seconds and 4 git commands to generate.