projects
/
userspace-rcu.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
workqueue: preserve FIFO work order
[userspace-rcu.git]
/
urcu
/
workqueue-fifo.h
diff --git
a/urcu/workqueue-fifo.h
b/urcu/workqueue-fifo.h
index 65d3a2e222cc0fbe991c986bce41f4e35722b309..bc19967a6a062eebc5d9c2d33598caab3f2a8f64 100644
(file)
--- a/
urcu/workqueue-fifo.h
+++ b/
urcu/workqueue-fifo.h
@@
-29,6
+29,7
@@
#include <urcu/wfcqueue.h>
#include <urcu/rculist.h>
#include <pthread.h>
#include <urcu/wfcqueue.h>
#include <urcu/rculist.h>
#include <pthread.h>
+#include <assert.h>
/*
* We use RCU to steal work from siblings. Therefore, one of RCU flavors
/*
* We use RCU to steal work from siblings. Therefore, one of RCU flavors
@@
-170,26
+171,37
@@
void urcu_worker_unregister(struct urcu_workqueue *queue,
* Try stealing work from siblings when we have nothing to do.
*/
static inline
* Try stealing work from siblings when we have nothing to do.
*/
static inline
-
void
___urcu_steal_work(struct urcu_worker *worker,
+
bool
___urcu_steal_work(struct urcu_worker *worker,
struct urcu_worker *sibling)
{
struct urcu_worker *sibling)
{
+ enum cds_wfcq_ret splice_ret;
+
+ /*
+ * Don't bother grabbing the sibling queue lock if it is empty.
+ */
+ if (cds_wfcq_empty(&sibling->head, &sibling->tail))
+ return false;
cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail);
cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail);
-
(void)
__cds_wfcq_splice_blocking(&worker->head,
+
splice_ret =
__cds_wfcq_splice_blocking(&worker->head,
&worker->tail,
&sibling->head,
&sibling->tail);
cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail);
&worker->tail,
&sibling->head,
&sibling->tail);
cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail);
+ /* Ensure that we preserve FIFO work order. */
+ assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
+ return splice_ret != CDS_WFCQ_RET_SRC_EMPTY;
}
static inline
}
static inline
-
int
__urcu_steal_work(struct urcu_workqueue *queue,
+
bool
__urcu_steal_work(struct urcu_workqueue *queue,
struct urcu_worker *worker)
{
struct urcu_worker *sibling_prev, *sibling_next;
struct cds_list_head *sibling_node;
struct urcu_worker *worker)
{
struct urcu_worker *sibling_prev, *sibling_next;
struct cds_list_head *sibling_node;
+ bool steal_performed = 0;
if (!(worker->flags & URCU_WORKER_STEAL))
if (!(worker->flags & URCU_WORKER_STEAL))
- return
0
;
+ return
false
;
rcu_read_lock();
rcu_read_lock();
@@
-199,7
+211,9
@@
int __urcu_steal_work(struct urcu_workqueue *queue,
sibling_next = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_next != worker)
sibling_next = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_next != worker)
- ___urcu_steal_work(worker, sibling_next);
+ steal_performed = ___urcu_steal_work(worker, sibling_next);
+ if (steal_performed)
+ goto end;
sibling_node = rcu_dereference(worker->sibling_node.prev);
if (sibling_node == &queue->sibling_head)
sibling_node = rcu_dereference(worker->sibling_node.prev);
if (sibling_node == &queue->sibling_head)
@@
-207,25
+221,26
@@
int __urcu_steal_work(struct urcu_workqueue *queue,
sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_prev != worker && sibling_prev != sibling_next)
sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_prev != worker && sibling_prev != sibling_next)
- ___urcu_steal_work(worker, sibling_prev);
-
+
steal_performed =
___urcu_steal_work(worker, sibling_prev);
+end:
rcu_read_unlock();
rcu_read_unlock();
- return
!cds_wfcq_empty(&worker->head, &worker->tail)
;
+ return
steal_performed
;
}
static inline
}
static inline
-
void
___urcu_wakeup_sibling(struct urcu_worker *sibling)
+
bool
___urcu_wakeup_sibling(struct urcu_worker *sibling)
{
{
- urcu_adaptative_wake_up(&sibling->wait_node);
+
return
urcu_adaptative_wake_up(&sibling->wait_node);
}
static inline
}
static inline
-
void
__urcu_wakeup_siblings(struct urcu_workqueue *queue,
+
bool
__urcu_wakeup_siblings(struct urcu_workqueue *queue,
struct urcu_worker *worker)
{
struct urcu_worker *sibling_prev, *sibling_next;
struct cds_list_head *sibling_node;
struct urcu_worker *worker)
{
struct urcu_worker *sibling_prev, *sibling_next;
struct cds_list_head *sibling_node;
+ bool wakeup_performed = 0;
if (!(worker->flags & URCU_WORKER_STEAL))
return;
if (!(worker->flags & URCU_WORKER_STEAL))
return;
@@
-242,7
+257,9
@@
void __urcu_wakeup_siblings(struct urcu_workqueue *queue,
sibling_next = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_next != worker)
sibling_next = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_next != worker)
- ___urcu_wakeup_sibling(sibling_next);
+ wakeup_performed = ___urcu_wakeup_sibling(sibling_next);
+ if (wakeup_performed)
+ goto end;
sibling_node = rcu_dereference(worker->sibling_node.prev);
if (sibling_node == &queue->sibling_head)
sibling_node = rcu_dereference(worker->sibling_node.prev);
if (sibling_node == &queue->sibling_head)
@@
-250,9
+267,11
@@
void __urcu_wakeup_siblings(struct urcu_workqueue *queue,
sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_prev != worker && sibling_prev != sibling_next)
sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_prev != worker && sibling_prev != sibling_next)
- ___urcu_wakeup_sibling(sibling_prev);
-
+
wakeup_performed =
___urcu_wakeup_sibling(sibling_prev);
+end:
rcu_read_unlock();
rcu_read_unlock();
+
+ return wakeup_performed;
}
static inline
}
static inline
@@
-319,7
+338,7
@@
do_work:
* We will be busy handling the work batch, awaken siblings so
* they can steal from us.
*/
* We will be busy handling the work batch, awaken siblings so
* they can steal from us.
*/
- __urcu_wakeup_siblings(queue, worker);
+
(void)
__urcu_wakeup_siblings(queue, worker);
}
static inline
}
static inline
@@
-331,12
+350,19
@@
struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
* If we are registered for work stealing, we need to dequeue
* safely against siblings.
*/
* If we are registered for work stealing, we need to dequeue
* safely against siblings.
*/
- if (worker->flags & URCU_WORKER_STEAL)
+ if (worker->flags & URCU_WORKER_STEAL) {
+ /*
+ * Don't bother grabbing the worker queue lock if it is
+ * empty.
+ */
+ if (cds_wfcq_empty(&worker->head, &worker->tail))
+ return NULL;
node = cds_wfcq_dequeue_blocking(&worker->head,
&worker->tail);
node = cds_wfcq_dequeue_blocking(&worker->head,
&worker->tail);
- else
+ } else {
node = ___cds_wfcq_dequeue_with_state(&worker->head,
&worker->tail, NULL, 1, 0);
node = ___cds_wfcq_dequeue_with_state(&worker->head,
&worker->tail, NULL, 1, 0);
+ }
if (!node)
return NULL;
return caa_container_of(node, struct urcu_work, node);
if (!node)
return NULL;
return caa_container_of(node, struct urcu_work, node);
This page took
0.026035 seconds
and
4
git commands to generate.