X-Git-Url: https://git.liburcu.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fwaiter.cpp;h=d185e299720617e64e7514d114e33cea4a0eb386;hp=c0ea3392fceea52f1438b6e839c1742e7348503b;hb=f40b76aed659ff694cf948bf8ebd1d4b5741c986;hpb=a968a9e069d76bfb1ac416dc834994c6a82b085e diff --git a/src/common/waiter.cpp b/src/common/waiter.cpp index c0ea3392f..d185e2997 100644 --- a/src/common/waiter.cpp +++ b/src/common/waiter.cpp @@ -7,6 +7,7 @@ */ #include "error.hpp" +#include "macros.hpp" #include "waiter.hpp" #include @@ -41,15 +42,18 @@ void lttng_waiter_wait(struct lttng_waiter *waiter) { unsigned int i; - DBG("Beginning of waiter wait period"); - /* Load and test condition before read state */ + DBG("Beginning of waiter \"wait\" period"); + + /* Load and test condition before read state. */ cmm_smp_rmb(); for (i = 0; i < WAIT_ATTEMPTS; i++) { if (uatomic_read(&waiter->state) != WAITER_WAITING) { goto skip_futex_wait; } + caa_cpu_relax(); } + while (uatomic_read(&waiter->state) == WAITER_WAITING) { if (!futex_noasync( &waiter->state, FUTEX_WAIT, WAITER_WAITING, nullptr, nullptr, 0)) { @@ -63,6 +67,7 @@ void lttng_waiter_wait(struct lttng_waiter *waiter) */ continue; } + switch (errno) { case EAGAIN: /* Value already changed. */ @@ -89,13 +94,16 @@ skip_futex_wait: if (uatomic_read(&waiter->state) & WAITER_TEARDOWN) { break; } + caa_cpu_relax(); } + while (!(uatomic_read(&waiter->state) & WAITER_TEARDOWN)) { poll(nullptr, 0, 10); } + LTTNG_ASSERT(uatomic_read(&waiter->state) & WAITER_TEARDOWN); - DBG("End of waiter wait period"); + DBG("End of waiter \"wait\" period"); } /* @@ -103,7 +111,7 @@ skip_futex_wait: * execution. In this scheme, the waiter owns the node memory, and we only allow * it to free this memory when it sees the WAITER_TEARDOWN flag. */ -void lttng_waiter_wake_up(struct lttng_waiter *waiter) +void lttng_waiter_wake(struct lttng_waiter *waiter) { cmm_smp_mb(); LTTNG_ASSERT(uatomic_read(&waiter->state) == WAITER_WAITING); @@ -114,6 +122,38 @@ void lttng_waiter_wake_up(struct lttng_waiter *waiter) abort(); } } + /* Allow teardown of struct urcu_wait memory. */ uatomic_or(&waiter->state, WAITER_TEARDOWN); } + +void lttng_wait_queue_init(struct lttng_wait_queue *queue) +{ + cds_wfs_init(&queue->stack); +} + +void lttng_wait_queue_add(struct lttng_wait_queue *queue, struct lttng_waiter *waiter) +{ + (void) cds_wfs_push(&queue->stack, &waiter->wait_queue_node); +} + +void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue) +{ + cds_wfs_head *waiters; + cds_wfs_node *iter, *iter_n; + + /* Move all waiters from the queue to our local stack. */ + waiters = __cds_wfs_pop_all(&queue->stack); + + /* Wake all waiters in our stack head. */ + cds_wfs_for_each_blocking_safe (waiters, iter, iter_n) { + auto *waiter = lttng::utils::container_of(iter, <tng_waiter::wait_queue_node); + + /* Don't wake already running threads. */ + if (waiter->state & WAITER_RUNNING) { + continue; + } + + lttng_waiter_wake(waiter); + } +}