waitqueue: add in_waitqueue field
[userspace-rcu.git] / urcu / workqueue-fifo.h
1 #ifndef _URCU_WORKQUEUE_FIFO_H
2 #define _URCU_WORKQUEUE_FIFO_H
3
4 /*
5 * urcu/workqueue-fifo.h
6 *
7 * Userspace RCU library - work queue scheme with FIFO semantic
8 *
9 * Copyright (c) 2014 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 *
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License as published by the Free Software Foundation; either
14 * version 2.1 of the License, or (at your option) any later version.
15 *
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public
22 * License along with this library; if not, write to the Free Software
23 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26 #include <urcu/uatomic.h>
27 #include <urcu/lfstack.h>
28 #include <urcu/waitqueue-lifo.h>
29 #include <urcu/wfcqueue.h>
30 #include <urcu/rculist.h>
31 #include <pthread.h>
32 #include <assert.h>
33
34 enum urcu_accept_ret {
35 URCU_ACCEPT_WORK = 0,
36 URCU_ACCEPT_SHUTDOWN = 1,
37 };
38
39 enum urcu_enqueue_ret {
40 URCU_ENQUEUE_OK = 0,
41 URCU_ENQUEUE_FULL = 1,
42 };
43
44 /*
45 * We use RCU to steal work from siblings. Therefore, one of RCU flavors
46 * need to be included before this header. All worker that participate
47 * in stealing (initialized with the URCU_WORKER_STEAL flag) need to be
48 * registered RCU readers threads.
49 */
50
51 struct urcu_work {
52 struct cds_wfcq_node node;
53 };
54
55 struct urcu_workqueue {
56 /* FIFO work queue */
57 struct __cds_wfcq_head head;
58 struct cds_wfcq_tail tail;
59
60 /* Associated wait queue for LIFO wait/wakeup */
61 struct urcu_wait_queue waitqueue;
62
63 /* RCU linked list head of siblings for work stealing. */
64 struct cds_list_head sibling_head;
65 pthread_mutex_t sibling_lock; /* Protect sibling list updates */
66
67 /* Maximum number of work entries (approximate). 0 means infinite. */
68 unsigned long nr_work_max;
69 unsigned long nr_work; /* Current number of work items */
70
71 int worker_flags; /* Worker flags */
72 int shutdown; /* Shutdown performed */
73 };
74
75 struct urcu_worker {
76 /* Workqueue which can be either used by worker, or stolen. */
77 struct cds_wfcq_head head;
78 struct cds_wfcq_tail tail;
79
80 /* Work belonging to worker. Cannot be stolen. */
81 struct urcu_work *own;
82
83 struct urcu_wait_node wait_node;
84 /* RCU linked list node of siblings for work stealing. */
85 struct cds_list_head sibling_node;
86 struct urcu_workqueue *queue;
87 int flags; /* enum urcu_worker_flags */
88 };
89
90 enum urcu_worker_flags {
91 URCU_WORKER_STEAL = (1 << 0),
92 };
93
94 static inline
95 void urcu_workqueue_init(struct urcu_workqueue *queue,
96 unsigned long max_queue_len,
97 int worker_flags)
98 {
99 __cds_wfcq_init(&queue->head, &queue->tail);
100 urcu_wait_queue_init(&queue->waitqueue);
101 CDS_INIT_LIST_HEAD(&queue->sibling_head);
102 pthread_mutex_init(&queue->sibling_lock, NULL);
103 queue->nr_work_max = max_queue_len;
104 queue->nr_work = 0;
105 queue->shutdown = false;
106 }
107
108 static inline
109 enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue,
110 struct urcu_work *work)
111 {
112 bool was_empty;
113 unsigned long nr_work_max;
114
115 nr_work_max = queue->nr_work_max;
116 if (nr_work_max) {
117 /* Approximate max queue size. */
118 if (uatomic_read(&queue->nr_work) >= nr_work_max)
119 return URCU_ENQUEUE_FULL;
120 uatomic_inc(&queue->nr_work);
121 }
122 cds_wfcq_node_init(&work->node);
123
124 /* Enqueue work. */
125 was_empty = !cds_wfcq_enqueue(&queue->head, &queue->tail,
126 &work->node);
127 /*
128 * If workqueue was previously empty, wakeup one worker thread.
129 * It will eventually grab the entire content of the work-queue
130 * (therefore grabbing a "work batch"). After having grabbed the
131 * work batch, while that thread is running and taking care of
132 * that work batch, when we enqueue more work, we will wake
133 * another thread (if there is one waiting), which will
134 * eventually grab the new batch, and so on. This scheme ensures
135 * that contiguous batch of work are handled by the same thread
136 * (for locality), and also ensures that we scale work to many
137 * worker threads when threads are busy enough to still be
138 * running when work is enqueued.
139 */
140 if (was_empty) {
141 rcu_read_lock(); /* Protect stack dequeue */
142 (void) urcu_dequeue_wake_single(&queue->waitqueue);
143 rcu_read_unlock(); /* Protect stack dequeue */
144 }
145 return URCU_ENQUEUE_OK;
146 }
147
148 static inline
149 void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
150 {
151 struct urcu_waiters waiters;
152
153 rcu_read_lock(); /* Protect stack dequeue */
154 urcu_move_waiters(&waiters, &queue->waitqueue);
155 rcu_read_unlock(); /* Protect stack dequeue */
156
157 (void) urcu_wake_all_waiters(&waiters);
158 }
159
160 static inline
161 void urcu_worker_init(struct urcu_workqueue *queue,
162 struct urcu_worker *worker)
163 {
164 cds_wfcq_init(&worker->head, &worker->tail);
165 urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
166 worker->own = NULL;
167 worker->wait_node.node.next = NULL;
168 worker->queue = queue;
169 worker->flags = queue->worker_flags;
170 }
171
172 static inline
173 void urcu_worker_register(struct urcu_workqueue *queue,
174 struct urcu_worker *worker)
175 {
176 if (worker->flags & URCU_WORKER_STEAL) {
177 pthread_mutex_lock(&queue->sibling_lock);
178 cds_list_add_rcu(&worker->sibling_node, &queue->sibling_head);
179 pthread_mutex_unlock(&queue->sibling_lock);
180 }
181 }
182
183 static inline
184 void urcu_worker_unregister(struct urcu_workqueue *queue,
185 struct urcu_worker *worker)
186 {
187 enum cds_wfcq_ret wfcq_ret;
188
189 if (worker->flags & URCU_WORKER_STEAL) {
190 pthread_mutex_lock(&queue->sibling_lock);
191 cds_list_del_rcu(&worker->sibling_node);
192 pthread_mutex_unlock(&queue->sibling_lock);
193 }
194
195 /*
196 * Make sure we are removed from waitqueue.
197 */
198 if (urcu_in_waitqueue(&worker->wait_node))
199 __urcu_workqueue_wakeup_all(queue);
200
201 /*
202 * Put any local work we still have back into the workqueue.
203 */
204 wfcq_ret = __cds_wfcq_splice_blocking(&queue->head,
205 &queue->tail,
206 &worker->head,
207 &worker->tail);
208 if (wfcq_ret != CDS_WFCQ_RET_SRC_EMPTY
209 && wfcq_ret == CDS_WFCQ_RET_DEST_EMPTY) {
210 /*
211 * Wakeup worker thread if we have put work back into
212 * workqueue that was previously empty.
213 */
214 rcu_read_lock(); /* Protect stack dequeue */
215 (void) urcu_dequeue_wake_single(&queue->waitqueue);
216 rcu_read_unlock(); /* Protect stack dequeue */
217 }
218
219 /*
220 * Wait for grace period before freeing or reusing
221 * "worker" because used by RCU linked list.
222 * Also prevents ABA for waitqueue stack dequeue: matches RCU
223 * read-side critical sections around dequeue and move all
224 * operations on waitqueue).
225 */
226 synchronize_rcu();
227 }
228
229 static inline
230 bool ___urcu_grab_work(struct urcu_worker *worker,
231 cds_wfcq_head_ptr_t src_head,
232 struct cds_wfcq_tail *src_tail,
233 bool steal)
234 {
235 enum cds_wfcq_ret splice_ret;
236 struct __cds_wfcq_head tmp_head;
237 struct cds_wfcq_tail tmp_tail;
238 struct cds_wfcq_node *node;
239
240 /*
241 * Don't bother grabbing the src queue lock if it is empty.
242 */
243 if (cds_wfcq_empty(src_head, src_tail))
244 return false;
245 __cds_wfcq_init(&tmp_head, &tmp_tail);
246
247 /* Ensure that we preserve FIFO work order. */
248 assert(!steal || worker->own == NULL);
249
250 /* Splice to temporary queue. */
251 if (steal)
252 cds_wfcq_dequeue_lock(src_head.h, src_tail);
253 splice_ret = __cds_wfcq_splice_blocking(&tmp_head,
254 &tmp_tail,
255 src_head,
256 src_tail);
257 if (steal)
258 cds_wfcq_dequeue_unlock(src_head.h, src_tail);
259 if (splice_ret == CDS_WFCQ_RET_SRC_EMPTY)
260 return false;
261
262 /*
263 * Keep one work entry for ourself. This ensures forward
264 * progress amongst stealing co-workers. This also ensures that
265 * when a worker grab some work from the global workqueue, it
266 * will have at least one work item to deal with.
267 */
268 if (worker->own == NULL) {
269 if (!steal) {
270 /*
271 * Try to grab own work from worker workqueue to
272 * preserve FIFO order.
273 */
274 node = cds_wfcq_dequeue_blocking(&worker->head,
275 &worker->tail);
276 if (node)
277 goto got_node;
278 }
279 node = __cds_wfcq_dequeue_blocking(&tmp_head, &tmp_tail);
280 assert(node != NULL);
281 got_node:
282 worker->own = caa_container_of(node, struct urcu_work, node);
283 }
284
285 /* Splice into worker workqueue. */
286 splice_ret = __cds_wfcq_splice_blocking(&worker->head,
287 &worker->tail,
288 &tmp_head,
289 &tmp_tail);
290 /* Ensure that we preserve FIFO work order. */
291 assert(!steal || splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
292 return true;
293 }
294
295 /*
296 * Try stealing work from siblings when we have nothing to do.
297 */
298 static inline
299 bool ___urcu_steal_work(struct urcu_worker *worker,
300 struct urcu_worker *sibling)
301 {
302 return ___urcu_grab_work(worker, &sibling->head, &sibling->tail, 1);
303 }
304
305 static inline
306 bool __urcu_steal_work(struct urcu_workqueue *queue,
307 struct urcu_worker *worker)
308 {
309 struct urcu_worker *sibling_prev, *sibling_next;
310 struct cds_list_head *sibling_node;
311 bool steal_performed = 0;
312
313 if (!(worker->flags & URCU_WORKER_STEAL))
314 return false;
315
316 rcu_read_lock();
317
318 sibling_node = rcu_dereference(worker->sibling_node.next);
319 if (sibling_node == &queue->sibling_head)
320 sibling_node = rcu_dereference(sibling_node->next);
321 sibling_next = caa_container_of(sibling_node, struct urcu_worker,
322 sibling_node);
323 if (sibling_next != worker)
324 steal_performed = ___urcu_steal_work(worker, sibling_next);
325 if (steal_performed)
326 goto end;
327
328 sibling_node = rcu_dereference(worker->sibling_node.prev);
329 if (sibling_node == &queue->sibling_head)
330 sibling_node = rcu_dereference(sibling_node->prev);
331 sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
332 sibling_node);
333 if (sibling_prev != worker && sibling_prev != sibling_next)
334 steal_performed = ___urcu_steal_work(worker, sibling_prev);
335 end:
336 rcu_read_unlock();
337
338 return steal_performed;
339 }
340
341 static inline
342 bool ___urcu_wakeup_sibling(struct urcu_worker *sibling)
343 {
344 return urcu_adaptative_wake_up(&sibling->wait_node);
345 }
346
347 static inline
348 bool __urcu_wakeup_siblings(struct urcu_workqueue *queue,
349 struct urcu_worker *worker)
350 {
351 struct urcu_worker *sibling_prev, *sibling_next;
352 struct cds_list_head *sibling_node;
353 bool wakeup_performed = 0;
354
355 if (!(worker->flags & URCU_WORKER_STEAL))
356 return;
357
358 /* Only wakeup siblings if we have work in our own queue. */
359 if (cds_wfcq_empty(&worker->head, &worker->tail))
360 return;
361
362 rcu_read_lock();
363
364 sibling_node = rcu_dereference(worker->sibling_node.next);
365 if (sibling_node == &queue->sibling_head)
366 sibling_node = rcu_dereference(sibling_node->next);
367 sibling_next = caa_container_of(sibling_node, struct urcu_worker,
368 sibling_node);
369 if (sibling_next != worker)
370 wakeup_performed = ___urcu_wakeup_sibling(sibling_next);
371 if (wakeup_performed)
372 goto end;
373
374 sibling_node = rcu_dereference(worker->sibling_node.prev);
375 if (sibling_node == &queue->sibling_head)
376 sibling_node = rcu_dereference(sibling_node->prev);
377 sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
378 sibling_node);
379 if (sibling_prev != worker && sibling_prev != sibling_next)
380 wakeup_performed = ___urcu_wakeup_sibling(sibling_prev);
381 end:
382 rcu_read_unlock();
383
384 return wakeup_performed;
385 }
386
387 static inline
388 enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
389 {
390 struct urcu_workqueue *queue = worker->queue;
391 enum cds_wfcq_ret wfcq_ret;
392 bool has_work;
393
394 has_work = ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
395 /* Don't wait if we have work to do. */
396 if (has_work || worker->own
397 || !cds_wfcq_empty(&worker->head, &worker->tail))
398 goto do_work;
399 /* Try to steal work from sibling instead of blocking */
400 if (__urcu_steal_work(queue, worker))
401 goto do_work;
402 /* No more work to do, check shutdown state */
403 if (CMM_LOAD_SHARED(queue->shutdown))
404 return URCU_ACCEPT_SHUTDOWN;
405 urcu_wait_set_state(&worker->wait_node,
406 URCU_WAIT_WAITING);
407 if (!urcu_in_waitqueue(&worker->wait_node)) {
408 int was_empty;
409
410 /* Protect stack dequeue against ABA */
411 synchronize_rcu();
412 was_empty = !urcu_wait_add(&queue->waitqueue,
413 &worker->wait_node);
414 /*
415 * If the wait queue was empty, it means we are the
416 * first thread to be put back into an otherwise empty
417 * wait queue. Re-check if work queue is empty after
418 * adding ourself to wait queue, so we can wakeup the
419 * top of wait queue since new work have appeared, and
420 * work enqueuer may not have seen that it needed to do
421 * a wake up.
422 */
423 if (was_empty && !cds_wfcq_empty(&queue->head,
424 &queue->tail)) {
425 rcu_read_lock(); /* Protect stack dequeue */
426 (void) urcu_dequeue_wake_single(&queue->waitqueue);
427 rcu_read_unlock(); /* Protect stack dequeue */
428 }
429 } else {
430 /*
431 * We are in the queue, or the dispatcher just removed
432 * us from it (after we read the next pointer), and is
433 * therefore awakening us. The state will therefore have
434 * been changed from WAITING to some other state, which
435 * will let the busy wait pass through.
436 */
437 }
438 urcu_adaptative_busy_wait(&worker->wait_node);
439 return;
440
441 do_work:
442 /*
443 * We will be busy handling the work batch, awaken siblings so
444 * they can steal from us.
445 */
446 (void) __urcu_wakeup_siblings(queue, worker);
447 return URCU_ACCEPT_WORK;
448 }
449
450 static inline
451 struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
452 {
453 struct urcu_workqueue *queue = worker->queue;
454 struct cds_wfcq_node *node;
455 struct urcu_work *work;
456
457 if (worker->own) {
458 /* Process our own work entry. */
459 work = worker->own;
460 worker->own = NULL;
461 goto end;
462 }
463 /*
464 * If we are registered for work stealing, we need to dequeue
465 * safely against siblings.
466 */
467 if (worker->flags & URCU_WORKER_STEAL) {
468 /*
469 * Don't bother grabbing the worker queue lock if it is
470 * empty.
471 */
472 if (cds_wfcq_empty(&worker->head, &worker->tail))
473 return NULL;
474 node = cds_wfcq_dequeue_blocking(&worker->head,
475 &worker->tail);
476 } else {
477 node = ___cds_wfcq_dequeue_with_state(&worker->head,
478 &worker->tail, NULL, 1, 0);
479 }
480 if (!node)
481 return NULL;
482 work = caa_container_of(node, struct urcu_work, node);
483 end:
484 if (queue->nr_work_max)
485 uatomic_dec(&queue->nr_work);
486 return work;
487 }
488
489 static inline
490 void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
491 {
492 /* Set shutdown */
493 CMM_STORE_SHARED(queue->shutdown, true);
494 /* Wakeup all workers */
495 __urcu_workqueue_wakeup_all(queue);
496 }
497
498 /*
499 * Use to let dispatcher steal work from the entire queue in case of
500 * stall. The "worker" parameter need to be intialized, but is usually
501 * not registered.
502 */
503 static inline
504 bool urcu_workqueue_steal_all(struct urcu_workqueue *queue,
505 struct urcu_worker *worker)
506 {
507 struct urcu_worker *sibling;
508 bool has_work = false;
509
510 if (worker->flags & URCU_WORKER_STEAL) {
511 rcu_read_lock();
512 /* Steal from each worker */
513 cds_list_for_each_entry_rcu(sibling, &queue->sibling_head,
514 sibling_node)
515 has_work |= ___urcu_grab_work(worker, &sibling->head,
516 &sibling->tail, 1);
517 rcu_read_unlock();
518 }
519
520 /* Steal from global workqueue */
521 has_work |= ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
522 return has_work;
523 }
524
525 #endif /* _URCU_WORKQUEUE_FIFO_H */
This page took 0.039616 seconds and 4 git commands to generate.