From 9340c38dbff1b407f35008f7f585a238fbd4de1c Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Sat, 1 Mar 2014 11:33:25 -0500 Subject: [PATCH] Fix: high cpu usage in synchronize_rcu with long RCU read-side C.S. We noticed that with this kind of scenario: - application using urcu-mb, urcu-membarrier, urcu-signal, or urcu-bp, - long RCU read-side critical sections, caused by e.g. long network I/O system calls, - other short lived RCU critical sections running in other threads, - very frequent invocation of call_rcu to enqueue callbacks, lead to abnormally high CPU usage within synchronize_rcu() in the call_rcu worker threads. Inspection of the code gives us the answer: in urcu.c, we expect that if we need to wait on a futex (wait_gp()), we expect to be able to end the grace period within the next loop, having been notified by a rcu_read_unlock(). However, this is not always the case: we can very well be awakened by a rcu_read_unlock() executed on a thread running short-lived RCU read-side critical sections, while the long-running RCU read-side C.S. is still active. We end up in a situation where we busy-wait for a very long time, because the counter is != RCU_QS_ACTIVE_ATTEMPTS until a 32-bit overflow happens (or more likely, until we complete the grace period). We need to change the wait_loops == RCU_QS_ACTIVE_ATTEMPTS check into an inequality to use wait_gp() for every attempts beyond RCU_QS_ACTIVE_ATTEMPTS loops. urcu-bp.c also has this issue. Moreover, it uses usleep() rather than poll() when dealing with long-running RCU read-side critical sections. Turn the usleep 1000us (1ms) into a poll of 10ms. One of the advantage of using poll() rather than usleep() is that it does not interact with SIGALRM. urcu-qsbr.c already checks for wait_loops >= RCU_QS_ACTIVE_ATTEMPTS, so it is not affected by this issue. Looking into these loops, however, shows that overflow of the loop counter, although unlikely, would bring us back to a situation of high cpu usage (a negative value well below RCU_QS_ACTIVE_ATTEMPTS). Therefore, change the counter behavior so it stops incrementing when it reaches RCU_QS_ACTIVE_ATTEMPTS, to eliminate overflow. Signed-off-by: Mathieu Desnoyers --- urcu-bp.c | 14 ++++++++------ urcu-qsbr.c | 5 +++-- urcu.c | 35 +++++++++++++++++++---------------- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/urcu-bp.c b/urcu-bp.c index 369beba..1856d3f 100644 --- a/urcu-bp.c +++ b/urcu-bp.c @@ -79,8 +79,8 @@ void *mremap_wrapper(void *old_address, size_t old_size, } #endif -/* Sleep delay in us */ -#define RCU_SLEEP_DELAY 1000 +/* Sleep delay in ms */ +#define RCU_SLEEP_DELAY_MS 10 #define INIT_NR_THREADS 8 #define ARENA_INIT_ALLOC \ sizeof(struct registry_chunk) \ @@ -169,7 +169,7 @@ static void wait_for_readers(struct cds_list_head *input_readers, struct cds_list_head *cur_snap_readers, struct cds_list_head *qsreaders) { - int wait_loops = 0; + unsigned int wait_loops = 0; struct rcu_reader *index, *tmp; /* @@ -178,7 +178,9 @@ static void wait_for_readers(struct cds_list_head *input_readers, * rcu_gp.ctr value. */ for (;;) { - wait_loops++; + if (wait_loops < RCU_QS_ACTIVE_ATTEMPTS) + wait_loops++; + cds_list_for_each_entry_safe(index, tmp, input_readers, node) { switch (rcu_reader_state(&index->ctr)) { case RCU_READER_ACTIVE_CURRENT: @@ -205,8 +207,8 @@ static void wait_for_readers(struct cds_list_head *input_readers, if (cds_list_empty(input_readers)) { break; } else { - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) - usleep(RCU_SLEEP_DELAY); + if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) + (void) poll(NULL, 0, RCU_SLEEP_DELAY_MS); else caa_cpu_relax(); } diff --git a/urcu-qsbr.c b/urcu-qsbr.c index d6adc5b..cd3beff 100644 --- a/urcu-qsbr.c +++ b/urcu-qsbr.c @@ -121,7 +121,7 @@ static void wait_for_readers(struct cds_list_head *input_readers, struct cds_list_head *cur_snap_readers, struct cds_list_head *qsreaders) { - int wait_loops = 0; + unsigned int wait_loops = 0; struct rcu_reader *index, *tmp; /* @@ -130,7 +130,6 @@ static void wait_for_readers(struct cds_list_head *input_readers, * current rcu_gp.ctr value. */ for (;;) { - wait_loops++; if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) { uatomic_set(&rcu_gp.futex, -1); /* @@ -143,6 +142,8 @@ static void wait_for_readers(struct cds_list_head *input_readers, } /* Write futex before read reader_gp */ cmm_smp_mb(); + } else { + wait_loops++; } cds_list_for_each_entry_safe(index, tmp, input_readers, node) { switch (rcu_reader_state(&index->ctr)) { diff --git a/urcu.c b/urcu.c index a016bbd..d6dec1a 100644 --- a/urcu.c +++ b/urcu.c @@ -53,9 +53,9 @@ /* * If a reader is really non-cooperative and refuses to commit its * rcu_active_readers count to memory (there is no barrier in the reader - * per-se), kick it after a few loops waiting for it. + * per-se), kick it after 10 loops waiting for it. */ -#define KICK_READER_LOOPS 10000 +#define KICK_READER_LOOPS 10 /* * Active attempts to check for reader Q.S. before calling futex(). @@ -230,8 +230,11 @@ static void wait_for_readers(struct cds_list_head *input_readers, struct cds_list_head *cur_snap_readers, struct cds_list_head *qsreaders) { - int wait_loops = 0; + unsigned int wait_loops = 0; struct rcu_reader *index, *tmp; +#ifdef HAS_INCOHERENT_CACHES + unsigned int wait_gp_loops = 0; +#endif /* HAS_INCOHERENT_CACHES */ /* * Wait for each thread URCU_TLS(rcu_reader).ctr to either @@ -239,11 +242,12 @@ static void wait_for_readers(struct cds_list_head *input_readers, * rcu_gp.ctr value. */ for (;;) { - wait_loops++; - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { + if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) { uatomic_dec(&rcu_gp.futex); /* Write futex before read reader_gp */ smp_mb_master(RCU_MB_GROUP); + } else { + wait_loops++; } cds_list_for_each_entry_safe(index, tmp, input_readers, node) { @@ -271,14 +275,14 @@ static void wait_for_readers(struct cds_list_head *input_readers, #ifndef HAS_INCOHERENT_CACHES if (cds_list_empty(input_readers)) { - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { + if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) { /* Read reader_gp before write futex */ smp_mb_master(RCU_MB_GROUP); uatomic_set(&rcu_gp.futex, 0); } break; } else { - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) + if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) wait_gp(); else caa_cpu_relax(); @@ -290,22 +294,21 @@ static void wait_for_readers(struct cds_list_head *input_readers, * for too long. */ if (cds_list_empty(input_readers)) { - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { + if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) { /* Read reader_gp before write futex */ smp_mb_master(RCU_MB_GROUP); uatomic_set(&rcu_gp.futex, 0); } break; } else { - switch (wait_loops) { - case RCU_QS_ACTIVE_ATTEMPTS: - wait_gp(); - break; /* only escape switch */ - case KICK_READER_LOOPS: + if (wait_gp_loops == KICK_READER_LOOPS) { smp_mb_master(RCU_MB_GROUP); - wait_loops = 0; - break; /* only escape switch */ - default: + wait_gp_loops = 0; + } + if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) { + wait_gp(); + wait_gp_loops++; + } else { caa_cpu_relax(); } } -- 2.34.1