13d9278a5beca9baa46ea9ef3cc7f3b1981091e0
[userspace-rcu.git] / urcu / workqueue-fifo.h
1 #ifndef _URCU_WORKQUEUE_FIFO_H
2 #define _URCU_WORKQUEUE_FIFO_H
3
4 /*
5 * urcu/workqueue-fifo.h
6 *
7 * Userspace RCU library - work queue scheme with FIFO semantic
8 *
9 * Copyright (c) 2014 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 *
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License as published by the Free Software Foundation; either
14 * version 2.1 of the License, or (at your option) any later version.
15 *
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public
22 * License along with this library; if not, write to the Free Software
23 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26 #include <urcu/uatomic.h>
27 #include <urcu/lfstack.h>
28 #include <urcu/waitqueue-lifo.h>
29 #include <urcu/wfcqueue.h>
30 #include <urcu/rculist.h>
31 #include <pthread.h>
32 #include <assert.h>
33
34 enum urcu_accept_ret {
35 URCU_ACCEPT_WORK = 0,
36 URCU_ACCEPT_SHUTDOWN = 1,
37 };
38
39 enum urcu_enqueue_ret {
40 URCU_ENQUEUE_OK = 0,
41 URCU_ENQUEUE_FULL = 1,
42 };
43
44 /*
45 * We use RCU to steal work from siblings. Therefore, one of RCU flavors
46 * need to be included before this header. All worker that participate
47 * in stealing (initialized with the URCU_WORKER_STEAL flag) need to be
48 * registered RCU readers threads.
49 */
50
51 struct urcu_work {
52 struct cds_wfcq_node node;
53 };
54
55 struct urcu_workqueue {
56 /* FIFO work queue */
57 struct __cds_wfcq_head head;
58 struct cds_wfcq_tail tail;
59
60 /* Associated wait queue for LIFO wait/wakeup */
61 struct urcu_wait_queue waitqueue;
62
63 /* RCU linked list head of siblings for work stealing. */
64 struct cds_list_head sibling_head;
65 pthread_mutex_t sibling_lock; /* Protect sibling list updates */
66
67 /* Maximum number of work entries (approximate). 0 means infinite. */
68 unsigned long nr_work_max;
69 unsigned long nr_work; /* Current number of work items */
70 bool shutdown; /* Shutdown performed */
71 };
72
73 struct urcu_worker {
74 /* Workqueue which can be either used by worker, or stolen. */
75 struct cds_wfcq_head head;
76 struct cds_wfcq_tail tail;
77
78 /* Work belonging to worker. Cannot be stolen. */
79 struct urcu_work *own;
80
81 struct urcu_wait_node wait_node;
82 /* RCU linked list node of siblings for work stealing. */
83 struct cds_list_head sibling_node;
84 struct urcu_workqueue *queue;
85 int flags; /* enum urcu_worker_flags */
86 };
87
88 enum urcu_worker_flags {
89 URCU_WORKER_STEAL = (1 << 0),
90 };
91
92 static inline
93 void urcu_workqueue_init(struct urcu_workqueue *queue,
94 unsigned long max_queue_len)
95 {
96 __cds_wfcq_init(&queue->head, &queue->tail);
97 urcu_wait_queue_init(&queue->waitqueue);
98 CDS_INIT_LIST_HEAD(&queue->sibling_head);
99 pthread_mutex_init(&queue->sibling_lock, NULL);
100 queue->nr_work_max = max_queue_len;
101 queue->nr_work = 0;
102 queue->shutdown = false;
103 }
104
105 static inline
106 enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue,
107 struct urcu_work *work)
108 {
109 bool was_empty;
110 unsigned long nr_work_max;
111
112 nr_work_max = queue->nr_work_max;
113 if (nr_work_max) {
114 /* Approximate max queue size. */
115 if (uatomic_read(&queue->nr_work) >= nr_work_max)
116 return URCU_ENQUEUE_FULL;
117 uatomic_inc(&queue->nr_work);
118 }
119 cds_wfcq_node_init(&work->node);
120
121 /* Enqueue work. */
122 was_empty = !cds_wfcq_enqueue(&queue->head, &queue->tail,
123 &work->node);
124 /*
125 * If workqueue was previously empty, wakeup one worker thread.
126 * It will eventually grab the entire content of the work-queue
127 * (therefore grabbing a "work batch"). After having grabbed the
128 * work batch, while that thread is running and taking care of
129 * that work batch, when we enqueue more work, we will wake
130 * another thread (if there is one waiting), which will
131 * eventually grab the new batch, and so on. This scheme ensures
132 * that contiguous batch of work are handled by the same thread
133 * (for locality), and also ensures that we scale work to many
134 * worker threads when threads are busy enough to still be
135 * running when work is enqueued.
136 */
137 if (was_empty) {
138 rcu_read_lock(); /* Protect stack dequeue */
139 (void) urcu_dequeue_wake_single(&queue->waitqueue);
140 rcu_read_unlock(); /* Protect stack dequeue */
141 }
142 return URCU_ENQUEUE_OK;
143 }
144
145 static inline
146 void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
147 {
148 struct urcu_waiters waiters;
149
150 rcu_read_lock(); /* Protect stack dequeue */
151 urcu_move_waiters(&waiters, &queue->waitqueue);
152 rcu_read_unlock(); /* Protect stack dequeue */
153
154 (void) urcu_wake_all_waiters(&waiters);
155 }
156
157 static inline
158 void urcu_worker_init(struct urcu_workqueue *queue,
159 struct urcu_worker *worker, int flags)
160 {
161 cds_wfcq_init(&worker->head, &worker->tail);
162 worker->flags = flags;
163 urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
164 worker->own = NULL;
165 worker->wait_node.node.next = NULL;
166 worker->queue = queue;
167 }
168
169 static inline
170 void urcu_worker_register(struct urcu_workqueue *queue,
171 struct urcu_worker *worker)
172 {
173 if (worker->flags & URCU_WORKER_STEAL) {
174 pthread_mutex_lock(&queue->sibling_lock);
175 cds_list_add_rcu(&worker->sibling_node, &queue->sibling_head);
176 pthread_mutex_unlock(&queue->sibling_lock);
177 }
178 }
179
180 static inline
181 void urcu_worker_unregister(struct urcu_workqueue *queue,
182 struct urcu_worker *worker)
183 {
184 enum cds_wfcq_ret wfcq_ret;
185
186 if (worker->flags & URCU_WORKER_STEAL) {
187 pthread_mutex_lock(&queue->sibling_lock);
188 cds_list_del_rcu(&worker->sibling_node);
189 pthread_mutex_unlock(&queue->sibling_lock);
190 }
191
192 /*
193 * Make sure we are removed from waitqueue.
194 */
195 if (CMM_LOAD_SHARED(worker->wait_node.node.next))
196 __urcu_workqueue_wakeup_all(queue);
197
198 /*
199 * Put any local work we still have back into the workqueue.
200 */
201 wfcq_ret = __cds_wfcq_splice_blocking(&queue->head,
202 &queue->tail,
203 &worker->head,
204 &worker->tail);
205 if (wfcq_ret != CDS_WFCQ_RET_SRC_EMPTY
206 && wfcq_ret == CDS_WFCQ_RET_DEST_EMPTY) {
207 /*
208 * Wakeup worker thread if we have put work back into
209 * workqueue that was previously empty.
210 */
211 rcu_read_lock(); /* Protect stack dequeue */
212 (void) urcu_dequeue_wake_single(&queue->waitqueue);
213 rcu_read_unlock(); /* Protect stack dequeue */
214 }
215
216 /*
217 * Wait for grace period before freeing or reusing
218 * "worker" because used by RCU linked list.
219 * Also prevents ABA for waitqueue stack dequeue: matches RCU
220 * read-side critical sections around dequeue and move all
221 * operations on waitqueue).
222 */
223 synchronize_rcu();
224 }
225
226 static inline
227 bool ___urcu_grab_work(struct urcu_worker *worker,
228 cds_wfcq_head_ptr_t src_head,
229 struct cds_wfcq_tail *src_tail,
230 bool steal)
231 {
232 enum cds_wfcq_ret splice_ret;
233 struct __cds_wfcq_head tmp_head;
234 struct cds_wfcq_tail tmp_tail;
235 struct cds_wfcq_node *node;
236
237 /*
238 * Don't bother grabbing the src queue lock if it is empty.
239 */
240 if (cds_wfcq_empty(src_head, src_tail))
241 return false;
242 __cds_wfcq_init(&tmp_head, &tmp_tail);
243
244 /* Ensure that we preserve FIFO work order. */
245 assert(!steal || worker->own == NULL);
246
247 /* Splice to temporary queue. */
248 if (steal)
249 cds_wfcq_dequeue_lock(src_head.h, src_tail);
250 splice_ret = __cds_wfcq_splice_blocking(&tmp_head,
251 &tmp_tail,
252 src_head,
253 src_tail);
254 if (steal)
255 cds_wfcq_dequeue_unlock(src_head.h, src_tail);
256 if (splice_ret == CDS_WFCQ_RET_SRC_EMPTY)
257 return false;
258
259 /*
260 * Keep one work entry for ourself. This ensures forward
261 * progress amongst stealing co-workers. This also ensures that
262 * when a worker grab some work from the global workqueue, it
263 * will have at least one work item to deal with.
264 */
265 if (worker->own == NULL) {
266 if (!steal) {
267 /*
268 * Try to grab own work from worker workqueue to
269 * preserve FIFO order.
270 */
271 node = cds_wfcq_dequeue_blocking(&worker->head,
272 &worker->tail);
273 if (node)
274 goto got_node;
275 }
276 node = __cds_wfcq_dequeue_blocking(&tmp_head, &tmp_tail);
277 assert(node != NULL);
278 got_node:
279 worker->own = caa_container_of(node, struct urcu_work, node);
280 }
281
282 /* Splice into worker workqueue. */
283 splice_ret = __cds_wfcq_splice_blocking(&worker->head,
284 &worker->tail,
285 &tmp_head,
286 &tmp_tail);
287 /* Ensure that we preserve FIFO work order. */
288 assert(!steal || splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
289 return true;
290 }
291
292 /*
293 * Try stealing work from siblings when we have nothing to do.
294 */
295 static inline
296 bool ___urcu_steal_work(struct urcu_worker *worker,
297 struct urcu_worker *sibling)
298 {
299 return ___urcu_grab_work(worker, &sibling->head, &sibling->tail, 1);
300 }
301
302 static inline
303 bool __urcu_steal_work(struct urcu_workqueue *queue,
304 struct urcu_worker *worker)
305 {
306 struct urcu_worker *sibling_prev, *sibling_next;
307 struct cds_list_head *sibling_node;
308 bool steal_performed = 0;
309
310 if (!(worker->flags & URCU_WORKER_STEAL))
311 return false;
312
313 rcu_read_lock();
314
315 sibling_node = rcu_dereference(worker->sibling_node.next);
316 if (sibling_node == &queue->sibling_head)
317 sibling_node = rcu_dereference(sibling_node->next);
318 sibling_next = caa_container_of(sibling_node, struct urcu_worker,
319 sibling_node);
320 if (sibling_next != worker)
321 steal_performed = ___urcu_steal_work(worker, sibling_next);
322 if (steal_performed)
323 goto end;
324
325 sibling_node = rcu_dereference(worker->sibling_node.prev);
326 if (sibling_node == &queue->sibling_head)
327 sibling_node = rcu_dereference(sibling_node->prev);
328 sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
329 sibling_node);
330 if (sibling_prev != worker && sibling_prev != sibling_next)
331 steal_performed = ___urcu_steal_work(worker, sibling_prev);
332 end:
333 rcu_read_unlock();
334
335 return steal_performed;
336 }
337
338 static inline
339 bool ___urcu_wakeup_sibling(struct urcu_worker *sibling)
340 {
341 return urcu_adaptative_wake_up(&sibling->wait_node);
342 }
343
344 static inline
345 bool __urcu_wakeup_siblings(struct urcu_workqueue *queue,
346 struct urcu_worker *worker)
347 {
348 struct urcu_worker *sibling_prev, *sibling_next;
349 struct cds_list_head *sibling_node;
350 bool wakeup_performed = 0;
351
352 if (!(worker->flags & URCU_WORKER_STEAL))
353 return;
354
355 /* Only wakeup siblings if we have work in our own queue. */
356 if (cds_wfcq_empty(&worker->head, &worker->tail))
357 return;
358
359 rcu_read_lock();
360
361 sibling_node = rcu_dereference(worker->sibling_node.next);
362 if (sibling_node == &queue->sibling_head)
363 sibling_node = rcu_dereference(sibling_node->next);
364 sibling_next = caa_container_of(sibling_node, struct urcu_worker,
365 sibling_node);
366 if (sibling_next != worker)
367 wakeup_performed = ___urcu_wakeup_sibling(sibling_next);
368 if (wakeup_performed)
369 goto end;
370
371 sibling_node = rcu_dereference(worker->sibling_node.prev);
372 if (sibling_node == &queue->sibling_head)
373 sibling_node = rcu_dereference(sibling_node->prev);
374 sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
375 sibling_node);
376 if (sibling_prev != worker && sibling_prev != sibling_next)
377 wakeup_performed = ___urcu_wakeup_sibling(sibling_prev);
378 end:
379 rcu_read_unlock();
380
381 return wakeup_performed;
382 }
383
384 static inline
385 enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
386 {
387 struct urcu_workqueue *queue = worker->queue;
388 enum cds_wfcq_ret wfcq_ret;
389 bool has_work;
390
391 has_work = ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
392 /* Don't wait if we have work to do. */
393 if (has_work || worker->own
394 || !cds_wfcq_empty(&worker->head, &worker->tail))
395 goto do_work;
396 /* Try to steal work from sibling instead of blocking */
397 if (__urcu_steal_work(queue, worker))
398 goto do_work;
399 /* No more work to do, check shutdown state */
400 if (CMM_LOAD_SHARED(queue->shutdown))
401 return URCU_ACCEPT_SHUTDOWN;
402 urcu_wait_set_state(&worker->wait_node,
403 URCU_WAIT_WAITING);
404 if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) {
405 int was_empty;
406
407 /*
408 * NULL next pointer. We are therefore not in
409 * the queue.
410 */
411 cds_lfs_node_init(&worker->wait_node.node);
412 /* Protect stack dequeue against ABA */
413 synchronize_rcu();
414 was_empty = !urcu_wait_add(&queue->waitqueue,
415 &worker->wait_node);
416 /*
417 * If the wait queue was empty, it means we are the
418 * first thread to be put back into an otherwise empty
419 * wait queue. Re-check if work queue is empty after
420 * adding ourself to wait queue, so we can wakeup the
421 * top of wait queue since new work have appeared, and
422 * work enqueuer may not have seen that it needed to do
423 * a wake up.
424 */
425 if (was_empty && !cds_wfcq_empty(&queue->head,
426 &queue->tail)) {
427 rcu_read_lock(); /* Protect stack dequeue */
428 (void) urcu_dequeue_wake_single(&queue->waitqueue);
429 rcu_read_unlock(); /* Protect stack dequeue */
430 }
431 } else {
432 /*
433 * Non-NULL next pointer. We are therefore in
434 * the queue, or the dispatcher just removed us
435 * from it (after we read the next pointer), and
436 * is therefore awakening us. The state will
437 * therefore have been changed from WAITING to
438 * some other state, which will let the busy
439 * wait pass through.
440 */
441 }
442 urcu_adaptative_busy_wait(&worker->wait_node);
443 return;
444
445 do_work:
446 /*
447 * We will be busy handling the work batch, awaken siblings so
448 * they can steal from us.
449 */
450 (void) __urcu_wakeup_siblings(queue, worker);
451 return URCU_ACCEPT_WORK;
452 }
453
454 static inline
455 struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
456 {
457 struct urcu_workqueue *queue = worker->queue;
458 struct cds_wfcq_node *node;
459 struct urcu_work *work;
460
461 if (worker->own) {
462 /* Process our own work entry. */
463 work = worker->own;
464 worker->own = NULL;
465 goto end;
466 }
467 /*
468 * If we are registered for work stealing, we need to dequeue
469 * safely against siblings.
470 */
471 if (worker->flags & URCU_WORKER_STEAL) {
472 /*
473 * Don't bother grabbing the worker queue lock if it is
474 * empty.
475 */
476 if (cds_wfcq_empty(&worker->head, &worker->tail))
477 return NULL;
478 node = cds_wfcq_dequeue_blocking(&worker->head,
479 &worker->tail);
480 } else {
481 node = ___cds_wfcq_dequeue_with_state(&worker->head,
482 &worker->tail, NULL, 1, 0);
483 }
484 if (!node)
485 return NULL;
486 work = caa_container_of(node, struct urcu_work, node);
487 end:
488 if (queue->nr_work_max)
489 uatomic_dec(&queue->nr_work);
490 return work;
491 }
492
493 static inline
494 void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
495 {
496 /* Set shutdown */
497 CMM_STORE_SHARED(queue->shutdown, true);
498 /* Wakeup all workers */
499 __urcu_workqueue_wakeup_all(queue);
500 }
501
502 #endif /* _URCU_WORKQUEUE_FIFO_H */
This page took 0.037518 seconds and 3 git commands to generate.