Cleanup: ring buffer: remove lib_ring_buffer_switch_new_end()
[lttng-modules.git] / lib / ringbuffer / ring_buffer_frontend.c
1 /*
2 * ring_buffer_frontend.c
3 *
4 * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; only
9 * version 2.1 of the License.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 *
20 *
21 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
22 * recorder (overwrite) modes. See thesis:
23 *
24 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
25 * dissertation, Ecole Polytechnique de Montreal.
26 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
27 *
28 * - Algorithm presentation in Chapter 5:
29 * "Lockless Multi-Core High-Throughput Buffering".
30 * - Algorithm formal verification in Section 8.6:
31 * "Formal verification of LTTng"
32 *
33 * Author:
34 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
35 *
36 * Inspired from LTT and RelayFS:
37 * Karim Yaghmour <karim@opersys.com>
38 * Tom Zanussi <zanussi@us.ibm.com>
39 * Bob Wisniewski <bob@watson.ibm.com>
40 * And from K42 :
41 * Bob Wisniewski <bob@watson.ibm.com>
42 *
43 * Buffer reader semantic :
44 *
45 * - get_subbuf_size
46 * while buffer is not finalized and empty
47 * - get_subbuf
48 * - if return value != 0, continue
49 * - splice one subbuffer worth of data to a pipe
50 * - splice the data from pipe to disk/network
51 * - put_subbuf
52 */
53
54 #include <linux/delay.h>
55 #include <linux/module.h>
56 #include <linux/percpu.h>
57
58 #include "../../wrapper/ringbuffer/config.h"
59 #include "../../wrapper/ringbuffer/backend.h"
60 #include "../../wrapper/ringbuffer/frontend.h"
61 #include "../../wrapper/ringbuffer/iterator.h"
62 #include "../../wrapper/ringbuffer/nohz.h"
63
64 /*
65 * Internal structure representing offsets to use at a sub-buffer switch.
66 */
67 struct switch_offsets {
68 unsigned long begin, end, old;
69 size_t pre_header_padding, size;
70 unsigned int switch_new_start:1, switch_old_start:1, switch_old_end:1;
71 };
72
73 #ifdef CONFIG_NO_HZ
74 enum tick_nohz_val {
75 TICK_NOHZ_STOP,
76 TICK_NOHZ_FLUSH,
77 TICK_NOHZ_RESTART,
78 };
79
80 static ATOMIC_NOTIFIER_HEAD(tick_nohz_notifier);
81 #endif /* CONFIG_NO_HZ */
82
83 static DEFINE_PER_CPU(spinlock_t, ring_buffer_nohz_lock);
84
85 DEFINE_PER_CPU(unsigned int, lib_ring_buffer_nesting);
86 EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting);
87
88 static
89 void lib_ring_buffer_print_errors(struct channel *chan,
90 struct lib_ring_buffer *buf, int cpu);
91
92 /*
93 * Must be called under cpu hotplug protection.
94 */
95 void lib_ring_buffer_free(struct lib_ring_buffer *buf)
96 {
97 struct channel *chan = buf->backend.chan;
98
99 lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
100 kfree(buf->commit_hot);
101 kfree(buf->commit_cold);
102
103 lib_ring_buffer_backend_free(&buf->backend);
104 }
105
106 /**
107 * lib_ring_buffer_reset - Reset ring buffer to initial values.
108 * @buf: Ring buffer.
109 *
110 * Effectively empty the ring buffer. Should be called when the buffer is not
111 * used for writing. The ring buffer can be opened for reading, but the reader
112 * should not be using the iterator concurrently with reset. The previous
113 * current iterator record is reset.
114 */
115 void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
116 {
117 struct channel *chan = buf->backend.chan;
118 const struct lib_ring_buffer_config *config = &chan->backend.config;
119 unsigned int i;
120
121 /*
122 * Reset iterator first. It will put the subbuffer if it currently holds
123 * it.
124 */
125 lib_ring_buffer_iterator_reset(buf);
126 v_set(config, &buf->offset, 0);
127 for (i = 0; i < chan->backend.num_subbuf; i++) {
128 v_set(config, &buf->commit_hot[i].cc, 0);
129 v_set(config, &buf->commit_hot[i].seq, 0);
130 v_set(config, &buf->commit_cold[i].cc_sb, 0);
131 }
132 atomic_long_set(&buf->consumed, 0);
133 atomic_set(&buf->record_disabled, 0);
134 v_set(config, &buf->last_tsc, 0);
135 lib_ring_buffer_backend_reset(&buf->backend);
136 /* Don't reset number of active readers */
137 v_set(config, &buf->records_lost_full, 0);
138 v_set(config, &buf->records_lost_wrap, 0);
139 v_set(config, &buf->records_lost_big, 0);
140 v_set(config, &buf->records_count, 0);
141 v_set(config, &buf->records_overrun, 0);
142 buf->finalized = 0;
143 }
144 EXPORT_SYMBOL_GPL(lib_ring_buffer_reset);
145
146 /**
147 * channel_reset - Reset channel to initial values.
148 * @chan: Channel.
149 *
150 * Effectively empty the channel. Should be called when the channel is not used
151 * for writing. The channel can be opened for reading, but the reader should not
152 * be using the iterator concurrently with reset. The previous current iterator
153 * record is reset.
154 */
155 void channel_reset(struct channel *chan)
156 {
157 /*
158 * Reset iterators first. Will put the subbuffer if held for reading.
159 */
160 channel_iterator_reset(chan);
161 atomic_set(&chan->record_disabled, 0);
162 /* Don't reset commit_count_mask, still valid */
163 channel_backend_reset(&chan->backend);
164 /* Don't reset switch/read timer interval */
165 /* Don't reset notifiers and notifier enable bits */
166 /* Don't reset reader reference count */
167 }
168 EXPORT_SYMBOL_GPL(channel_reset);
169
170 /*
171 * Must be called under cpu hotplug protection.
172 */
173 int lib_ring_buffer_create(struct lib_ring_buffer *buf,
174 struct channel_backend *chanb, int cpu)
175 {
176 const struct lib_ring_buffer_config *config = &chanb->config;
177 struct channel *chan = container_of(chanb, struct channel, backend);
178 void *priv = chanb->priv;
179 size_t subbuf_header_size;
180 u64 tsc;
181 int ret;
182
183 /* Test for cpu hotplug */
184 if (buf->backend.allocated)
185 return 0;
186
187 /*
188 * Paranoia: per cpu dynamic allocation is not officially documented as
189 * zeroing the memory, so let's do it here too, just in case.
190 */
191 memset(buf, 0, sizeof(*buf));
192
193 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, cpu);
194 if (ret)
195 return ret;
196
197 buf->commit_hot =
198 kzalloc_node(ALIGN(sizeof(*buf->commit_hot)
199 * chan->backend.num_subbuf,
200 1 << INTERNODE_CACHE_SHIFT),
201 GFP_KERNEL, cpu_to_node(max(cpu, 0)));
202 if (!buf->commit_hot) {
203 ret = -ENOMEM;
204 goto free_chanbuf;
205 }
206
207 buf->commit_cold =
208 kzalloc_node(ALIGN(sizeof(*buf->commit_cold)
209 * chan->backend.num_subbuf,
210 1 << INTERNODE_CACHE_SHIFT),
211 GFP_KERNEL, cpu_to_node(max(cpu, 0)));
212 if (!buf->commit_cold) {
213 ret = -ENOMEM;
214 goto free_commit;
215 }
216
217 init_waitqueue_head(&buf->read_wait);
218 init_waitqueue_head(&buf->write_wait);
219 raw_spin_lock_init(&buf->raw_tick_nohz_spinlock);
220
221 /*
222 * Write the subbuffer header for first subbuffer so we know the total
223 * duration of data gathering.
224 */
225 subbuf_header_size = config->cb.subbuffer_header_size();
226 v_set(config, &buf->offset, subbuf_header_size);
227 subbuffer_id_clear_noref(config, &buf->backend.buf_wsb[0].id);
228 tsc = config->cb.ring_buffer_clock_read(buf->backend.chan);
229 config->cb.buffer_begin(buf, tsc, 0);
230 v_add(config, subbuf_header_size, &buf->commit_hot[0].cc);
231
232 if (config->cb.buffer_create) {
233 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name);
234 if (ret)
235 goto free_init;
236 }
237
238 /*
239 * Ensure the buffer is ready before setting it to allocated and setting
240 * the cpumask.
241 * Used for cpu hotplug vs cpumask iteration.
242 */
243 smp_wmb();
244 buf->backend.allocated = 1;
245
246 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
247 CHAN_WARN_ON(chan, cpumask_test_cpu(cpu,
248 chan->backend.cpumask));
249 cpumask_set_cpu(cpu, chan->backend.cpumask);
250 }
251
252 return 0;
253
254 /* Error handling */
255 free_init:
256 kfree(buf->commit_cold);
257 free_commit:
258 kfree(buf->commit_hot);
259 free_chanbuf:
260 lib_ring_buffer_backend_free(&buf->backend);
261 return ret;
262 }
263
264 static void switch_buffer_timer(unsigned long data)
265 {
266 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
267 struct channel *chan = buf->backend.chan;
268 const struct lib_ring_buffer_config *config = &chan->backend.config;
269
270 /*
271 * Only flush buffers periodically if readers are active.
272 */
273 if (atomic_long_read(&buf->active_readers))
274 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
275
276 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
277 mod_timer_pinned(&buf->switch_timer,
278 jiffies + chan->switch_timer_interval);
279 else
280 mod_timer(&buf->switch_timer,
281 jiffies + chan->switch_timer_interval);
282 }
283
284 /*
285 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
286 */
287 static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
288 {
289 struct channel *chan = buf->backend.chan;
290 const struct lib_ring_buffer_config *config = &chan->backend.config;
291
292 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
293 return;
294 init_timer(&buf->switch_timer);
295 buf->switch_timer.function = switch_buffer_timer;
296 buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
297 buf->switch_timer.data = (unsigned long)buf;
298 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
299 add_timer_on(&buf->switch_timer, buf->backend.cpu);
300 else
301 add_timer(&buf->switch_timer);
302 buf->switch_timer_enabled = 1;
303 }
304
305 /*
306 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
307 */
308 static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
309 {
310 struct channel *chan = buf->backend.chan;
311
312 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
313 return;
314
315 del_timer_sync(&buf->switch_timer);
316 buf->switch_timer_enabled = 0;
317 }
318
319 /*
320 * Polling timer to check the channels for data.
321 */
322 static void read_buffer_timer(unsigned long data)
323 {
324 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
325 struct channel *chan = buf->backend.chan;
326 const struct lib_ring_buffer_config *config = &chan->backend.config;
327
328 CHAN_WARN_ON(chan, !buf->backend.allocated);
329
330 if (atomic_long_read(&buf->active_readers)
331 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
332 wake_up_interruptible(&buf->read_wait);
333 wake_up_interruptible(&chan->read_wait);
334 }
335
336 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
337 mod_timer_pinned(&buf->read_timer,
338 jiffies + chan->read_timer_interval);
339 else
340 mod_timer(&buf->read_timer,
341 jiffies + chan->read_timer_interval);
342 }
343
344 /*
345 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
346 */
347 static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
348 {
349 struct channel *chan = buf->backend.chan;
350 const struct lib_ring_buffer_config *config = &chan->backend.config;
351
352 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
353 || !chan->read_timer_interval
354 || buf->read_timer_enabled)
355 return;
356
357 init_timer(&buf->read_timer);
358 buf->read_timer.function = read_buffer_timer;
359 buf->read_timer.expires = jiffies + chan->read_timer_interval;
360 buf->read_timer.data = (unsigned long)buf;
361
362 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
363 add_timer_on(&buf->read_timer, buf->backend.cpu);
364 else
365 add_timer(&buf->read_timer);
366 buf->read_timer_enabled = 1;
367 }
368
369 /*
370 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
371 */
372 static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
373 {
374 struct channel *chan = buf->backend.chan;
375 const struct lib_ring_buffer_config *config = &chan->backend.config;
376
377 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
378 || !chan->read_timer_interval
379 || !buf->read_timer_enabled)
380 return;
381
382 del_timer_sync(&buf->read_timer);
383 /*
384 * do one more check to catch data that has been written in the last
385 * timer period.
386 */
387 if (lib_ring_buffer_poll_deliver(config, buf, chan)) {
388 wake_up_interruptible(&buf->read_wait);
389 wake_up_interruptible(&chan->read_wait);
390 }
391 buf->read_timer_enabled = 0;
392 }
393
394 #ifdef CONFIG_HOTPLUG_CPU
395 /**
396 * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
397 * @nb: notifier block
398 * @action: hotplug action to take
399 * @hcpu: CPU number
400 *
401 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
402 */
403 static
404 int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
405 unsigned long action,
406 void *hcpu)
407 {
408 unsigned int cpu = (unsigned long)hcpu;
409 struct channel *chan = container_of(nb, struct channel,
410 cpu_hp_notifier);
411 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
412 const struct lib_ring_buffer_config *config = &chan->backend.config;
413
414 if (!chan->cpu_hp_enable)
415 return NOTIFY_DONE;
416
417 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
418
419 switch (action) {
420 case CPU_DOWN_FAILED:
421 case CPU_DOWN_FAILED_FROZEN:
422 case CPU_ONLINE:
423 case CPU_ONLINE_FROZEN:
424 wake_up_interruptible(&chan->hp_wait);
425 lib_ring_buffer_start_switch_timer(buf);
426 lib_ring_buffer_start_read_timer(buf);
427 return NOTIFY_OK;
428
429 case CPU_DOWN_PREPARE:
430 case CPU_DOWN_PREPARE_FROZEN:
431 lib_ring_buffer_stop_switch_timer(buf);
432 lib_ring_buffer_stop_read_timer(buf);
433 return NOTIFY_OK;
434
435 case CPU_DEAD:
436 case CPU_DEAD_FROZEN:
437 /*
438 * Performing a buffer switch on a remote CPU. Performed by
439 * the CPU responsible for doing the hotunplug after the target
440 * CPU stopped running completely. Ensures that all data
441 * from that remote CPU is flushed.
442 */
443 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
444 return NOTIFY_OK;
445
446 default:
447 return NOTIFY_DONE;
448 }
449 }
450 #endif
451
452 #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
453 /*
454 * For per-cpu buffers, call the reader wakeups before switching the buffer, so
455 * that wake-up-tracing generated events are flushed before going idle (in
456 * tick_nohz). We test if the spinlock is locked to deal with the race where
457 * readers try to sample the ring buffer before we perform the switch. We let
458 * the readers retry in that case. If there is data in the buffer, the wake up
459 * is going to forbid the CPU running the reader thread from going idle.
460 */
461 static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb,
462 unsigned long val,
463 void *data)
464 {
465 struct channel *chan = container_of(nb, struct channel,
466 tick_nohz_notifier);
467 const struct lib_ring_buffer_config *config = &chan->backend.config;
468 struct lib_ring_buffer *buf;
469 int cpu = smp_processor_id();
470
471 if (config->alloc != RING_BUFFER_ALLOC_PER_CPU) {
472 /*
473 * We don't support keeping the system idle with global buffers
474 * and streaming active. In order to do so, we would need to
475 * sample a non-nohz-cpumask racelessly with the nohz updates
476 * without adding synchronization overhead to nohz. Leave this
477 * use-case out for now.
478 */
479 return 0;
480 }
481
482 buf = channel_get_ring_buffer(config, chan, cpu);
483 switch (val) {
484 case TICK_NOHZ_FLUSH:
485 raw_spin_lock(&buf->raw_tick_nohz_spinlock);
486 if (config->wakeup == RING_BUFFER_WAKEUP_BY_TIMER
487 && chan->read_timer_interval
488 && atomic_long_read(&buf->active_readers)
489 && (lib_ring_buffer_poll_deliver(config, buf, chan)
490 || lib_ring_buffer_pending_data(config, buf, chan))) {
491 wake_up_interruptible(&buf->read_wait);
492 wake_up_interruptible(&chan->read_wait);
493 }
494 if (chan->switch_timer_interval)
495 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
496 raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
497 break;
498 case TICK_NOHZ_STOP:
499 spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
500 lib_ring_buffer_stop_switch_timer(buf);
501 lib_ring_buffer_stop_read_timer(buf);
502 spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
503 break;
504 case TICK_NOHZ_RESTART:
505 spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
506 lib_ring_buffer_start_read_timer(buf);
507 lib_ring_buffer_start_switch_timer(buf);
508 spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
509 break;
510 }
511
512 return 0;
513 }
514
515 void notrace lib_ring_buffer_tick_nohz_flush(void)
516 {
517 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_FLUSH,
518 NULL);
519 }
520
521 void notrace lib_ring_buffer_tick_nohz_stop(void)
522 {
523 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_STOP,
524 NULL);
525 }
526
527 void notrace lib_ring_buffer_tick_nohz_restart(void)
528 {
529 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_RESTART,
530 NULL);
531 }
532 #endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
533
534 /*
535 * Holds CPU hotplug.
536 */
537 static void channel_unregister_notifiers(struct channel *chan)
538 {
539 const struct lib_ring_buffer_config *config = &chan->backend.config;
540 int cpu;
541
542 channel_iterator_unregister_notifiers(chan);
543 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
544 #ifdef CONFIG_NO_HZ
545 /*
546 * Remove the nohz notifier first, so we are certain we stop
547 * the timers.
548 */
549 atomic_notifier_chain_unregister(&tick_nohz_notifier,
550 &chan->tick_nohz_notifier);
551 /*
552 * ring_buffer_nohz_lock will not be needed below, because
553 * we just removed the notifiers, which were the only source of
554 * concurrency.
555 */
556 #endif /* CONFIG_NO_HZ */
557 #ifdef CONFIG_HOTPLUG_CPU
558 get_online_cpus();
559 chan->cpu_hp_enable = 0;
560 for_each_online_cpu(cpu) {
561 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
562 cpu);
563 lib_ring_buffer_stop_switch_timer(buf);
564 lib_ring_buffer_stop_read_timer(buf);
565 }
566 put_online_cpus();
567 unregister_cpu_notifier(&chan->cpu_hp_notifier);
568 #else
569 for_each_possible_cpu(cpu) {
570 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
571 cpu);
572 lib_ring_buffer_stop_switch_timer(buf);
573 lib_ring_buffer_stop_read_timer(buf);
574 }
575 #endif
576 } else {
577 struct lib_ring_buffer *buf = chan->backend.buf;
578
579 lib_ring_buffer_stop_switch_timer(buf);
580 lib_ring_buffer_stop_read_timer(buf);
581 }
582 channel_backend_unregister_notifiers(&chan->backend);
583 }
584
585 static void channel_free(struct channel *chan)
586 {
587 channel_iterator_free(chan);
588 channel_backend_free(&chan->backend);
589 kfree(chan);
590 }
591
592 /**
593 * channel_create - Create channel.
594 * @config: ring buffer instance configuration
595 * @name: name of the channel
596 * @priv: ring buffer client private data
597 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
598 * address mapping. It is used only by RING_BUFFER_STATIC
599 * configuration. It can be set to NULL for other backends.
600 * @subbuf_size: subbuffer size
601 * @num_subbuf: number of subbuffers
602 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
603 * padding to let readers get those sub-buffers.
604 * Used for live streaming.
605 * @read_timer_interval: Time interval (in us) to wake up pending readers.
606 *
607 * Holds cpu hotplug.
608 * Returns NULL on failure.
609 */
610 struct channel *channel_create(const struct lib_ring_buffer_config *config,
611 const char *name, void *priv, void *buf_addr,
612 size_t subbuf_size,
613 size_t num_subbuf, unsigned int switch_timer_interval,
614 unsigned int read_timer_interval)
615 {
616 int ret, cpu;
617 struct channel *chan;
618
619 if (lib_ring_buffer_check_config(config, switch_timer_interval,
620 read_timer_interval))
621 return NULL;
622
623 chan = kzalloc(sizeof(struct channel), GFP_KERNEL);
624 if (!chan)
625 return NULL;
626
627 ret = channel_backend_init(&chan->backend, name, config, priv,
628 subbuf_size, num_subbuf);
629 if (ret)
630 goto error;
631
632 ret = channel_iterator_init(chan);
633 if (ret)
634 goto error_free_backend;
635
636 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
637 chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
638 chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
639 kref_init(&chan->ref);
640 init_waitqueue_head(&chan->read_wait);
641 init_waitqueue_head(&chan->hp_wait);
642
643 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
644 #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
645 /* Only benefit from NO_HZ idle with per-cpu buffers for now. */
646 chan->tick_nohz_notifier.notifier_call =
647 ring_buffer_tick_nohz_callback;
648 chan->tick_nohz_notifier.priority = ~0U;
649 atomic_notifier_chain_register(&tick_nohz_notifier,
650 &chan->tick_nohz_notifier);
651 #endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
652
653 /*
654 * In case of non-hotplug cpu, if the ring-buffer is allocated
655 * in early initcall, it will not be notified of secondary cpus.
656 * In that off case, we need to allocate for all possible cpus.
657 */
658 #ifdef CONFIG_HOTPLUG_CPU
659 chan->cpu_hp_notifier.notifier_call =
660 lib_ring_buffer_cpu_hp_callback;
661 chan->cpu_hp_notifier.priority = 6;
662 register_cpu_notifier(&chan->cpu_hp_notifier);
663
664 get_online_cpus();
665 for_each_online_cpu(cpu) {
666 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
667 cpu);
668 spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
669 lib_ring_buffer_start_switch_timer(buf);
670 lib_ring_buffer_start_read_timer(buf);
671 spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
672 }
673 chan->cpu_hp_enable = 1;
674 put_online_cpus();
675 #else
676 for_each_possible_cpu(cpu) {
677 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
678 cpu);
679 spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
680 lib_ring_buffer_start_switch_timer(buf);
681 lib_ring_buffer_start_read_timer(buf);
682 spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
683 }
684 #endif
685 } else {
686 struct lib_ring_buffer *buf = chan->backend.buf;
687
688 lib_ring_buffer_start_switch_timer(buf);
689 lib_ring_buffer_start_read_timer(buf);
690 }
691
692 return chan;
693
694 error_free_backend:
695 channel_backend_free(&chan->backend);
696 error:
697 kfree(chan);
698 return NULL;
699 }
700 EXPORT_SYMBOL_GPL(channel_create);
701
702 static
703 void channel_release(struct kref *kref)
704 {
705 struct channel *chan = container_of(kref, struct channel, ref);
706 channel_free(chan);
707 }
708
709 /**
710 * channel_destroy - Finalize, wait for q.s. and destroy channel.
711 * @chan: channel to destroy
712 *
713 * Holds cpu hotplug.
714 * Call "destroy" callback, finalize channels, and then decrement the
715 * channel reference count. Note that when readers have completed data
716 * consumption of finalized channels, get_subbuf() will return -ENODATA.
717 * They should release their handle at that point. Returns the private
718 * data pointer.
719 */
720 void *channel_destroy(struct channel *chan)
721 {
722 int cpu;
723 const struct lib_ring_buffer_config *config = &chan->backend.config;
724 void *priv;
725
726 channel_unregister_notifiers(chan);
727
728 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
729 /*
730 * No need to hold cpu hotplug, because all notifiers have been
731 * unregistered.
732 */
733 for_each_channel_cpu(cpu, chan) {
734 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
735 cpu);
736
737 if (config->cb.buffer_finalize)
738 config->cb.buffer_finalize(buf,
739 chan->backend.priv,
740 cpu);
741 if (buf->backend.allocated)
742 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
743 /*
744 * Perform flush before writing to finalized.
745 */
746 smp_wmb();
747 ACCESS_ONCE(buf->finalized) = 1;
748 wake_up_interruptible(&buf->read_wait);
749 }
750 } else {
751 struct lib_ring_buffer *buf = chan->backend.buf;
752
753 if (config->cb.buffer_finalize)
754 config->cb.buffer_finalize(buf, chan->backend.priv, -1);
755 if (buf->backend.allocated)
756 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
757 /*
758 * Perform flush before writing to finalized.
759 */
760 smp_wmb();
761 ACCESS_ONCE(buf->finalized) = 1;
762 wake_up_interruptible(&buf->read_wait);
763 }
764 ACCESS_ONCE(chan->finalized) = 1;
765 wake_up_interruptible(&chan->hp_wait);
766 wake_up_interruptible(&chan->read_wait);
767 priv = chan->backend.priv;
768 kref_put(&chan->ref, channel_release);
769 return priv;
770 }
771 EXPORT_SYMBOL_GPL(channel_destroy);
772
773 struct lib_ring_buffer *channel_get_ring_buffer(
774 const struct lib_ring_buffer_config *config,
775 struct channel *chan, int cpu)
776 {
777 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL)
778 return chan->backend.buf;
779 else
780 return per_cpu_ptr(chan->backend.buf, cpu);
781 }
782 EXPORT_SYMBOL_GPL(channel_get_ring_buffer);
783
784 int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
785 {
786 struct channel *chan = buf->backend.chan;
787
788 if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
789 return -EBUSY;
790 kref_get(&chan->ref);
791 smp_mb__after_atomic_inc();
792 return 0;
793 }
794 EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
795
796 void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
797 {
798 struct channel *chan = buf->backend.chan;
799
800 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
801 smp_mb__before_atomic_dec();
802 atomic_long_dec(&buf->active_readers);
803 kref_put(&chan->ref, channel_release);
804 }
805 EXPORT_SYMBOL_GPL(lib_ring_buffer_release_read);
806
807 /*
808 * Promote compiler barrier to a smp_mb().
809 * For the specific ring buffer case, this IPI call should be removed if the
810 * architecture does not reorder writes. This should eventually be provided by
811 * a separate architecture-specific infrastructure.
812 */
813 static void remote_mb(void *info)
814 {
815 smp_mb();
816 }
817
818 /**
819 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
820 * @buf: ring buffer
821 * @consumed: consumed count indicating the position where to read
822 * @produced: produced count, indicates position when to stop reading
823 *
824 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
825 * data to read at consumed position, or 0 if the get operation succeeds.
826 * Busy-loop trying to get data if the tick_nohz sequence lock is held.
827 */
828
829 int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
830 unsigned long *consumed, unsigned long *produced)
831 {
832 struct channel *chan = buf->backend.chan;
833 const struct lib_ring_buffer_config *config = &chan->backend.config;
834 unsigned long consumed_cur, write_offset;
835 int finalized;
836
837 retry:
838 finalized = ACCESS_ONCE(buf->finalized);
839 /*
840 * Read finalized before counters.
841 */
842 smp_rmb();
843 consumed_cur = atomic_long_read(&buf->consumed);
844 /*
845 * No need to issue a memory barrier between consumed count read and
846 * write offset read, because consumed count can only change
847 * concurrently in overwrite mode, and we keep a sequence counter
848 * identifier derived from the write offset to check we are getting
849 * the same sub-buffer we are expecting (the sub-buffers are atomically
850 * "tagged" upon writes, tags are checked upon read).
851 */
852 write_offset = v_read(config, &buf->offset);
853
854 /*
855 * Check that we are not about to read the same subbuffer in
856 * which the writer head is.
857 */
858 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
859 == 0)
860 goto nodata;
861
862 *consumed = consumed_cur;
863 *produced = subbuf_trunc(write_offset, chan);
864
865 return 0;
866
867 nodata:
868 /*
869 * The memory barriers __wait_event()/wake_up_interruptible() take care
870 * of "raw_spin_is_locked" memory ordering.
871 */
872 if (finalized)
873 return -ENODATA;
874 else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
875 goto retry;
876 else
877 return -EAGAIN;
878 }
879 EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot);
880
881 /**
882 * lib_ring_buffer_put_snapshot - move consumed counter forward
883 *
884 * Should only be called from consumer context.
885 * @buf: ring buffer
886 * @consumed_new: new consumed count value
887 */
888 void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
889 unsigned long consumed_new)
890 {
891 struct lib_ring_buffer_backend *bufb = &buf->backend;
892 struct channel *chan = bufb->chan;
893 unsigned long consumed;
894
895 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
896
897 /*
898 * Only push the consumed value forward.
899 * If the consumed cmpxchg fails, this is because we have been pushed by
900 * the writer in flight recorder mode.
901 */
902 consumed = atomic_long_read(&buf->consumed);
903 while ((long) consumed - (long) consumed_new < 0)
904 consumed = atomic_long_cmpxchg(&buf->consumed, consumed,
905 consumed_new);
906 /* Wake-up the metadata producer */
907 wake_up_interruptible(&buf->write_wait);
908 }
909 EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer);
910
911 /**
912 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
913 * @buf: ring buffer
914 * @consumed: consumed count indicating the position where to read
915 *
916 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
917 * data to read at consumed position, or 0 if the get operation succeeds.
918 * Busy-loop trying to get data if the tick_nohz sequence lock is held.
919 */
920 int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
921 unsigned long consumed)
922 {
923 struct channel *chan = buf->backend.chan;
924 const struct lib_ring_buffer_config *config = &chan->backend.config;
925 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
926 int ret;
927 int finalized;
928
929 if (buf->get_subbuf) {
930 /*
931 * Reader is trying to get a subbuffer twice.
932 */
933 CHAN_WARN_ON(chan, 1);
934 return -EBUSY;
935 }
936 retry:
937 finalized = ACCESS_ONCE(buf->finalized);
938 /*
939 * Read finalized before counters.
940 */
941 smp_rmb();
942 consumed_cur = atomic_long_read(&buf->consumed);
943 consumed_idx = subbuf_index(consumed, chan);
944 commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb);
945 /*
946 * Make sure we read the commit count before reading the buffer
947 * data and the write offset. Correct consumed offset ordering
948 * wrt commit count is insured by the use of cmpxchg to update
949 * the consumed offset.
950 * smp_call_function_single can fail if the remote CPU is offline,
951 * this is OK because then there is no wmb to execute there.
952 * If our thread is executing on the same CPU as the on the buffers
953 * belongs to, we don't have to synchronize it at all. If we are
954 * migrated, the scheduler will take care of the memory barriers.
955 * Normally, smp_call_function_single() should ensure program order when
956 * executing the remote function, which implies that it surrounds the
957 * function execution with :
958 * smp_mb()
959 * send IPI
960 * csd_lock_wait
961 * recv IPI
962 * smp_mb()
963 * exec. function
964 * smp_mb()
965 * csd unlock
966 * smp_mb()
967 *
968 * However, smp_call_function_single() does not seem to clearly execute
969 * such barriers. It depends on spinlock semantic to provide the barrier
970 * before executing the IPI and, when busy-looping, csd_lock_wait only
971 * executes smp_mb() when it has to wait for the other CPU.
972 *
973 * I don't trust this code. Therefore, let's add the smp_mb() sequence
974 * required ourself, even if duplicated. It has no performance impact
975 * anyway.
976 *
977 * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
978 * read and write vs write. They do not ensure core synchronization. We
979 * really have to ensure total order between the 3 barriers running on
980 * the 2 CPUs.
981 */
982 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
983 if (config->sync == RING_BUFFER_SYNC_PER_CPU
984 && config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
985 if (raw_smp_processor_id() != buf->backend.cpu) {
986 /* Total order with IPI handler smp_mb() */
987 smp_mb();
988 smp_call_function_single(buf->backend.cpu,
989 remote_mb, NULL, 1);
990 /* Total order with IPI handler smp_mb() */
991 smp_mb();
992 }
993 } else {
994 /* Total order with IPI handler smp_mb() */
995 smp_mb();
996 smp_call_function(remote_mb, NULL, 1);
997 /* Total order with IPI handler smp_mb() */
998 smp_mb();
999 }
1000 } else {
1001 /*
1002 * Local rmb to match the remote wmb to read the commit count
1003 * before the buffer data and the write offset.
1004 */
1005 smp_rmb();
1006 }
1007
1008 write_offset = v_read(config, &buf->offset);
1009
1010 /*
1011 * Check that the buffer we are getting is after or at consumed_cur
1012 * position.
1013 */
1014 if ((long) subbuf_trunc(consumed, chan)
1015 - (long) subbuf_trunc(consumed_cur, chan) < 0)
1016 goto nodata;
1017
1018 /*
1019 * Check that the subbuffer we are trying to consume has been
1020 * already fully committed.
1021 */
1022 if (((commit_count - chan->backend.subbuf_size)
1023 & chan->commit_count_mask)
1024 - (buf_trunc(consumed_cur, chan)
1025 >> chan->backend.num_subbuf_order)
1026 != 0)
1027 goto nodata;
1028
1029 /*
1030 * Check that we are not about to read the same subbuffer in
1031 * which the writer head is.
1032 */
1033 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
1034 == 0)
1035 goto nodata;
1036
1037 /*
1038 * Failure to get the subbuffer causes a busy-loop retry without going
1039 * to a wait queue. These are caused by short-lived race windows where
1040 * the writer is getting access to a subbuffer we were trying to get
1041 * access to. Also checks that the "consumed" buffer count we are
1042 * looking for matches the one contained in the subbuffer id.
1043 */
1044 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
1045 consumed_idx, buf_trunc_val(consumed, chan));
1046 if (ret)
1047 goto retry;
1048 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
1049
1050 buf->get_subbuf_consumed = consumed;
1051 buf->get_subbuf = 1;
1052
1053 return 0;
1054
1055 nodata:
1056 /*
1057 * The memory barriers __wait_event()/wake_up_interruptible() take care
1058 * of "raw_spin_is_locked" memory ordering.
1059 */
1060 if (finalized)
1061 return -ENODATA;
1062 else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
1063 goto retry;
1064 else
1065 return -EAGAIN;
1066 }
1067 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_subbuf);
1068
1069 /**
1070 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
1071 * @buf: ring buffer
1072 */
1073 void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
1074 {
1075 struct lib_ring_buffer_backend *bufb = &buf->backend;
1076 struct channel *chan = bufb->chan;
1077 const struct lib_ring_buffer_config *config = &chan->backend.config;
1078 unsigned long read_sb_bindex, consumed_idx, consumed;
1079
1080 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
1081
1082 if (!buf->get_subbuf) {
1083 /*
1084 * Reader puts a subbuffer it did not get.
1085 */
1086 CHAN_WARN_ON(chan, 1);
1087 return;
1088 }
1089 consumed = buf->get_subbuf_consumed;
1090 buf->get_subbuf = 0;
1091
1092 /*
1093 * Clear the records_unread counter. (overruns counter)
1094 * Can still be non-zero if a file reader simply grabbed the data
1095 * without using iterators.
1096 * Can be below zero if an iterator is used on a snapshot more than
1097 * once.
1098 */
1099 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
1100 v_add(config, v_read(config,
1101 &bufb->array[read_sb_bindex]->records_unread),
1102 &bufb->records_read);
1103 v_set(config, &bufb->array[read_sb_bindex]->records_unread, 0);
1104 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
1105 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
1106 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
1107
1108 /*
1109 * Exchange the reader subbuffer with the one we put in its place in the
1110 * writer subbuffer table. Expect the original consumed count. If
1111 * update_read_sb_index fails, this is because the writer updated the
1112 * subbuffer concurrently. We should therefore keep the subbuffer we
1113 * currently have: it has become invalid to try reading this sub-buffer
1114 * consumed count value anyway.
1115 */
1116 consumed_idx = subbuf_index(consumed, chan);
1117 update_read_sb_index(config, &buf->backend, &chan->backend,
1118 consumed_idx, buf_trunc_val(consumed, chan));
1119 /*
1120 * update_read_sb_index return value ignored. Don't exchange sub-buffer
1121 * if the writer concurrently updated it.
1122 */
1123 }
1124 EXPORT_SYMBOL_GPL(lib_ring_buffer_put_subbuf);
1125
1126 /*
1127 * cons_offset is an iterator on all subbuffer offsets between the reader
1128 * position and the writer position. (inclusive)
1129 */
1130 static
1131 void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
1132 struct channel *chan,
1133 unsigned long cons_offset,
1134 int cpu)
1135 {
1136 const struct lib_ring_buffer_config *config = &chan->backend.config;
1137 unsigned long cons_idx, commit_count, commit_count_sb;
1138
1139 cons_idx = subbuf_index(cons_offset, chan);
1140 commit_count = v_read(config, &buf->commit_hot[cons_idx].cc);
1141 commit_count_sb = v_read(config, &buf->commit_cold[cons_idx].cc_sb);
1142
1143 if (subbuf_offset(commit_count, chan) != 0)
1144 printk(KERN_WARNING
1145 "ring buffer %s, cpu %d: "
1146 "commit count in subbuffer %lu,\n"
1147 "expecting multiples of %lu bytes\n"
1148 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
1149 chan->backend.name, cpu, cons_idx,
1150 chan->backend.subbuf_size,
1151 commit_count, commit_count_sb);
1152
1153 printk(KERN_DEBUG "ring buffer: %s, cpu %d: %lu bytes committed\n",
1154 chan->backend.name, cpu, commit_count);
1155 }
1156
1157 static
1158 void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
1159 struct channel *chan,
1160 void *priv, int cpu)
1161 {
1162 const struct lib_ring_buffer_config *config = &chan->backend.config;
1163 unsigned long write_offset, cons_offset;
1164
1165 /*
1166 * No need to order commit_count, write_offset and cons_offset reads
1167 * because we execute at teardown when no more writer nor reader
1168 * references are left.
1169 */
1170 write_offset = v_read(config, &buf->offset);
1171 cons_offset = atomic_long_read(&buf->consumed);
1172 if (write_offset != cons_offset)
1173 printk(KERN_DEBUG
1174 "ring buffer %s, cpu %d: "
1175 "non-consumed data\n"
1176 " [ %lu bytes written, %lu bytes read ]\n",
1177 chan->backend.name, cpu, write_offset, cons_offset);
1178
1179 for (cons_offset = atomic_long_read(&buf->consumed);
1180 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
1181 chan)
1182 - cons_offset) > 0;
1183 cons_offset = subbuf_align(cons_offset, chan))
1184 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
1185 cpu);
1186 }
1187
1188 static
1189 void lib_ring_buffer_print_errors(struct channel *chan,
1190 struct lib_ring_buffer *buf, int cpu)
1191 {
1192 const struct lib_ring_buffer_config *config = &chan->backend.config;
1193 void *priv = chan->backend.priv;
1194
1195 if (!strcmp(chan->backend.name, "relay-metadata")) {
1196 printk(KERN_DEBUG "ring buffer %s: %lu records written, "
1197 "%lu records overrun\n",
1198 chan->backend.name,
1199 v_read(config, &buf->records_count),
1200 v_read(config, &buf->records_overrun));
1201 } else {
1202 printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, "
1203 "%lu records overrun\n",
1204 chan->backend.name, cpu,
1205 v_read(config, &buf->records_count),
1206 v_read(config, &buf->records_overrun));
1207
1208 if (v_read(config, &buf->records_lost_full)
1209 || v_read(config, &buf->records_lost_wrap)
1210 || v_read(config, &buf->records_lost_big))
1211 printk(KERN_WARNING
1212 "ring buffer %s, cpu %d: records were lost. Caused by:\n"
1213 " [ %lu buffer full, %lu nest buffer wrap-around, "
1214 "%lu event too big ]\n",
1215 chan->backend.name, cpu,
1216 v_read(config, &buf->records_lost_full),
1217 v_read(config, &buf->records_lost_wrap),
1218 v_read(config, &buf->records_lost_big));
1219 }
1220 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu);
1221 }
1222
1223 /*
1224 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1225 *
1226 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1227 */
1228 static
1229 void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
1230 struct channel *chan,
1231 struct switch_offsets *offsets,
1232 u64 tsc)
1233 {
1234 const struct lib_ring_buffer_config *config = &chan->backend.config;
1235 unsigned long oldidx = subbuf_index(offsets->old, chan);
1236 unsigned long commit_count;
1237
1238 config->cb.buffer_begin(buf, tsc, oldidx);
1239
1240 /*
1241 * Order all writes to buffer before the commit count update that will
1242 * determine that the subbuffer is full.
1243 */
1244 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1245 /*
1246 * Must write slot data before incrementing commit count. This
1247 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1248 * by get_subbuf().
1249 */
1250 barrier();
1251 } else
1252 smp_wmb();
1253 v_add(config, config->cb.subbuffer_header_size(),
1254 &buf->commit_hot[oldidx].cc);
1255 commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
1256 /* Check if the written buffer has to be delivered */
1257 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
1258 commit_count, oldidx);
1259 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1260 offsets->old, commit_count,
1261 config->cb.subbuffer_header_size());
1262 }
1263
1264 /*
1265 * lib_ring_buffer_switch_old_end: switch old subbuffer
1266 *
1267 * Note : offset_old should never be 0 here. It is ok, because we never perform
1268 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1269 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1270 * subbuffer.
1271 */
1272 static
1273 void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
1274 struct channel *chan,
1275 struct switch_offsets *offsets,
1276 u64 tsc)
1277 {
1278 const struct lib_ring_buffer_config *config = &chan->backend.config;
1279 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1280 unsigned long commit_count, padding_size, data_size;
1281
1282 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1283 padding_size = chan->backend.subbuf_size - data_size;
1284 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
1285
1286 /*
1287 * Order all writes to buffer before the commit count update that will
1288 * determine that the subbuffer is full.
1289 */
1290 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1291 /*
1292 * Must write slot data before incrementing commit count. This
1293 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1294 * by get_subbuf().
1295 */
1296 barrier();
1297 } else
1298 smp_wmb();
1299 v_add(config, padding_size, &buf->commit_hot[oldidx].cc);
1300 commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
1301 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
1302 commit_count, oldidx);
1303 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
1304 offsets->old, commit_count,
1305 padding_size);
1306 }
1307
1308 /*
1309 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1310 *
1311 * This code can be executed unordered : writers may already have written to the
1312 * sub-buffer before this code gets executed, caution. The commit makes sure
1313 * that this code is executed before the deliver of this sub-buffer.
1314 */
1315 static
1316 void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
1317 struct channel *chan,
1318 struct switch_offsets *offsets,
1319 u64 tsc)
1320 {
1321 const struct lib_ring_buffer_config *config = &chan->backend.config;
1322 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1323 unsigned long commit_count;
1324
1325 config->cb.buffer_begin(buf, tsc, beginidx);
1326
1327 /*
1328 * Order all writes to buffer before the commit count update that will
1329 * determine that the subbuffer is full.
1330 */
1331 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1332 /*
1333 * Must write slot data before incrementing commit count. This
1334 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1335 * by get_subbuf().
1336 */
1337 barrier();
1338 } else
1339 smp_wmb();
1340 v_add(config, config->cb.subbuffer_header_size(),
1341 &buf->commit_hot[beginidx].cc);
1342 commit_count = v_read(config, &buf->commit_hot[beginidx].cc);
1343 /* Check if the written buffer has to be delivered */
1344 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
1345 commit_count, beginidx);
1346 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
1347 offsets->begin, commit_count,
1348 config->cb.subbuffer_header_size());
1349 }
1350
1351 /*
1352 * Returns :
1353 * 0 if ok
1354 * !0 if execution must be aborted.
1355 */
1356 static
1357 int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1358 struct lib_ring_buffer *buf,
1359 struct channel *chan,
1360 struct switch_offsets *offsets,
1361 u64 *tsc)
1362 {
1363 const struct lib_ring_buffer_config *config = &chan->backend.config;
1364 unsigned long off;
1365
1366 offsets->begin = v_read(config, &buf->offset);
1367 offsets->old = offsets->begin;
1368 offsets->switch_old_start = 0;
1369 off = subbuf_offset(offsets->begin, chan);
1370
1371 *tsc = config->cb.ring_buffer_clock_read(chan);
1372
1373 /*
1374 * Ensure we flush the header of an empty subbuffer when doing the
1375 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1376 * total data gathering duration even if there were no records saved
1377 * after the last buffer switch.
1378 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1379 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1380 * subbuffer header as appropriate.
1381 * The next record that reserves space will be responsible for
1382 * populating the following subbuffer header. We choose not to populate
1383 * the next subbuffer header here because we want to be able to use
1384 * SWITCH_ACTIVE for periodical buffer flush and CPU tick_nohz stop
1385 * buffer flush, which must guarantee that all the buffer content
1386 * (records and header timestamps) are visible to the reader. This is
1387 * required for quiescence guarantees for the fusion merge.
1388 */
1389 if (mode == SWITCH_FLUSH || off > 0) {
1390 if (unlikely(off == 0)) {
1391 /*
1392 * A final flush that encounters an empty
1393 * sub-buffer cannot switch buffer if a
1394 * reader is located within this sub-buffer.
1395 * Anyway, the purpose of final flushing of a
1396 * sub-buffer at offset 0 is to handle the case
1397 * of entirely empty stream.
1398 */
1399 if (unlikely(subbuf_trunc(offsets->begin, chan)
1400 - subbuf_trunc((unsigned long)
1401 atomic_long_read(&buf->consumed), chan)
1402 >= chan->backend.buf_size))
1403 return -1;
1404 /*
1405 * The client does not save any header information.
1406 * Don't switch empty subbuffer on finalize, because it
1407 * is invalid to deliver a completely empty subbuffer.
1408 */
1409 if (!config->cb.subbuffer_header_size())
1410 return -1;
1411 /*
1412 * Need to write the subbuffer start header on finalize.
1413 */
1414 offsets->switch_old_start = 1;
1415 }
1416 offsets->begin = subbuf_align(offsets->begin, chan);
1417 } else
1418 return -1; /* we do not have to switch : buffer is empty */
1419 /* Note: old points to the next subbuf at offset 0 */
1420 offsets->end = offsets->begin;
1421 return 0;
1422 }
1423
1424 /*
1425 * Force a sub-buffer switch. This operation is completely reentrant : can be
1426 * called while tracing is active with absolutely no lock held.
1427 *
1428 * Note, however, that as a v_cmpxchg is used for some atomic
1429 * operations, this function must be called from the CPU which owns the buffer
1430 * for a ACTIVE flush.
1431 */
1432 void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode)
1433 {
1434 struct channel *chan = buf->backend.chan;
1435 const struct lib_ring_buffer_config *config = &chan->backend.config;
1436 struct switch_offsets offsets;
1437 unsigned long oldidx;
1438 u64 tsc;
1439
1440 offsets.size = 0;
1441
1442 /*
1443 * Perform retryable operations.
1444 */
1445 do {
1446 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1447 &tsc))
1448 return; /* Switch not needed */
1449 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1450 != offsets.old);
1451
1452 /*
1453 * Atomically update last_tsc. This update races against concurrent
1454 * atomic updates, but the race will always cause supplementary full TSC
1455 * records, never the opposite (missing a full TSC record when it would
1456 * be needed).
1457 */
1458 save_last_tsc(config, buf, tsc);
1459
1460 /*
1461 * Push the reader if necessary
1462 */
1463 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1464
1465 oldidx = subbuf_index(offsets.old, chan);
1466 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx);
1467
1468 /*
1469 * May need to populate header start on SWITCH_FLUSH.
1470 */
1471 if (offsets.switch_old_start) {
1472 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
1473 offsets.old += config->cb.subbuffer_header_size();
1474 }
1475
1476 /*
1477 * Switch old subbuffer.
1478 */
1479 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
1480 }
1481 EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
1482
1483 /*
1484 * Returns :
1485 * 0 if ok
1486 * -ENOSPC if event size is too large for packet.
1487 * -ENOBUFS if there is currently not enough space in buffer for the event.
1488 * -EIO if data cannot be written into the buffer for any other reason.
1489 */
1490 static
1491 int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
1492 struct channel *chan,
1493 struct switch_offsets *offsets,
1494 struct lib_ring_buffer_ctx *ctx)
1495 {
1496 const struct lib_ring_buffer_config *config = &chan->backend.config;
1497 unsigned long reserve_commit_diff;
1498
1499 offsets->begin = v_read(config, &buf->offset);
1500 offsets->old = offsets->begin;
1501 offsets->switch_new_start = 0;
1502 offsets->switch_old_end = 0;
1503 offsets->pre_header_padding = 0;
1504
1505 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
1506 if ((int64_t) ctx->tsc == -EIO)
1507 return -EIO;
1508
1509 if (last_tsc_overflow(config, buf, ctx->tsc))
1510 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
1511
1512 if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1513 offsets->switch_new_start = 1; /* For offsets->begin */
1514 } else {
1515 offsets->size = config->cb.record_header_size(config, chan,
1516 offsets->begin,
1517 &offsets->pre_header_padding,
1518 ctx);
1519 offsets->size +=
1520 lib_ring_buffer_align(offsets->begin + offsets->size,
1521 ctx->largest_align)
1522 + ctx->data_size;
1523 if (unlikely(subbuf_offset(offsets->begin, chan) +
1524 offsets->size > chan->backend.subbuf_size)) {
1525 offsets->switch_old_end = 1; /* For offsets->old */
1526 offsets->switch_new_start = 1; /* For offsets->begin */
1527 }
1528 }
1529 if (unlikely(offsets->switch_new_start)) {
1530 unsigned long sb_index;
1531
1532 /*
1533 * We are typically not filling the previous buffer completely.
1534 */
1535 if (likely(offsets->switch_old_end))
1536 offsets->begin = subbuf_align(offsets->begin, chan);
1537 offsets->begin = offsets->begin
1538 + config->cb.subbuffer_header_size();
1539 /* Test new buffer integrity */
1540 sb_index = subbuf_index(offsets->begin, chan);
1541 reserve_commit_diff =
1542 (buf_trunc(offsets->begin, chan)
1543 >> chan->backend.num_subbuf_order)
1544 - ((unsigned long) v_read(config,
1545 &buf->commit_cold[sb_index].cc_sb)
1546 & chan->commit_count_mask);
1547 if (likely(reserve_commit_diff == 0)) {
1548 /* Next subbuffer not being written to. */
1549 if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1550 subbuf_trunc(offsets->begin, chan)
1551 - subbuf_trunc((unsigned long)
1552 atomic_long_read(&buf->consumed), chan)
1553 >= chan->backend.buf_size)) {
1554 /*
1555 * We do not overwrite non consumed buffers
1556 * and we are full : record is lost.
1557 */
1558 v_inc(config, &buf->records_lost_full);
1559 return -ENOBUFS;
1560 } else {
1561 /*
1562 * Next subbuffer not being written to, and we
1563 * are either in overwrite mode or the buffer is
1564 * not full. It's safe to write in this new
1565 * subbuffer.
1566 */
1567 }
1568 } else {
1569 /*
1570 * Next subbuffer reserve offset does not match the
1571 * commit offset. Drop record in producer-consumer and
1572 * overwrite mode. Caused by either a writer OOPS or too
1573 * many nested writes over a reserve/commit pair.
1574 */
1575 v_inc(config, &buf->records_lost_wrap);
1576 return -EIO;
1577 }
1578 offsets->size =
1579 config->cb.record_header_size(config, chan,
1580 offsets->begin,
1581 &offsets->pre_header_padding,
1582 ctx);
1583 offsets->size +=
1584 lib_ring_buffer_align(offsets->begin + offsets->size,
1585 ctx->largest_align)
1586 + ctx->data_size;
1587 if (unlikely(subbuf_offset(offsets->begin, chan)
1588 + offsets->size > chan->backend.subbuf_size)) {
1589 /*
1590 * Record too big for subbuffers, report error, don't
1591 * complete the sub-buffer switch.
1592 */
1593 v_inc(config, &buf->records_lost_big);
1594 return -ENOSPC;
1595 } else {
1596 /*
1597 * We just made a successful buffer switch and the
1598 * record fits in the new subbuffer. Let's write.
1599 */
1600 }
1601 } else {
1602 /*
1603 * Record fits in the current buffer and we are not on a switch
1604 * boundary. It's safe to write.
1605 */
1606 }
1607 offsets->end = offsets->begin + offsets->size;
1608 return 0;
1609 }
1610
1611 /**
1612 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1613 * @ctx: ring buffer context.
1614 *
1615 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1616 * -EIO for other errors, else returns 0.
1617 * It will take care of sub-buffer switching.
1618 */
1619 int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
1620 {
1621 struct channel *chan = ctx->chan;
1622 const struct lib_ring_buffer_config *config = &chan->backend.config;
1623 struct lib_ring_buffer *buf;
1624 struct switch_offsets offsets;
1625 int ret;
1626
1627 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
1628 buf = per_cpu_ptr(chan->backend.buf, ctx->cpu);
1629 else
1630 buf = chan->backend.buf;
1631 ctx->buf = buf;
1632
1633 offsets.size = 0;
1634
1635 do {
1636 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1637 ctx);
1638 if (unlikely(ret))
1639 return ret;
1640 } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1641 offsets.end)
1642 != offsets.old));
1643
1644 /*
1645 * Atomically update last_tsc. This update races against concurrent
1646 * atomic updates, but the race will always cause supplementary full TSC
1647 * records, never the opposite (missing a full TSC record when it would
1648 * be needed).
1649 */
1650 save_last_tsc(config, buf, ctx->tsc);
1651
1652 /*
1653 * Push the reader if necessary
1654 */
1655 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1656
1657 /*
1658 * Clear noref flag for this subbuffer.
1659 */
1660 lib_ring_buffer_clear_noref(config, &buf->backend,
1661 subbuf_index(offsets.end - 1, chan));
1662
1663 /*
1664 * Switch old subbuffer if needed.
1665 */
1666 if (unlikely(offsets.switch_old_end)) {
1667 lib_ring_buffer_clear_noref(config, &buf->backend,
1668 subbuf_index(offsets.old - 1, chan));
1669 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
1670 }
1671
1672 /*
1673 * Populate new subbuffer.
1674 */
1675 if (unlikely(offsets.switch_new_start))
1676 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
1677
1678 ctx->slot_size = offsets.size;
1679 ctx->pre_offset = offsets.begin;
1680 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1681 return 0;
1682 }
1683 EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow);
1684
1685 int __init init_lib_ring_buffer_frontend(void)
1686 {
1687 int cpu;
1688
1689 for_each_possible_cpu(cpu)
1690 spin_lock_init(&per_cpu(ring_buffer_nohz_lock, cpu));
1691 return 0;
1692 }
1693
1694 module_init(init_lib_ring_buffer_frontend);
1695
1696 void __exit exit_lib_ring_buffer_frontend(void)
1697 {
1698 }
1699
1700 module_exit(exit_lib_ring_buffer_frontend);
This page took 0.092846 seconds and 5 git commands to generate.