Implement structure, compound array/sequence types
[lttng-modules.git] / lib / ringbuffer / ring_buffer_frontend.c
CommitLineData
f3bc08c5
MD
1/*
2 * ring_buffer_frontend.c
3 *
886d51a3
MD
4 * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; only
9 * version 2.1 of the License.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 *
f3bc08c5
MD
20 *
21 * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
22 * recorder (overwrite) modes. See thesis:
23 *
24 * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
25 * dissertation, Ecole Polytechnique de Montreal.
26 * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
27 *
28 * - Algorithm presentation in Chapter 5:
29 * "Lockless Multi-Core High-Throughput Buffering".
30 * - Algorithm formal verification in Section 8.6:
31 * "Formal verification of LTTng"
32 *
33 * Author:
34 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
35 *
36 * Inspired from LTT and RelayFS:
37 * Karim Yaghmour <karim@opersys.com>
38 * Tom Zanussi <zanussi@us.ibm.com>
39 * Bob Wisniewski <bob@watson.ibm.com>
40 * And from K42 :
41 * Bob Wisniewski <bob@watson.ibm.com>
42 *
43 * Buffer reader semantic :
44 *
45 * - get_subbuf_size
46 * while buffer is not finalized and empty
47 * - get_subbuf
48 * - if return value != 0, continue
49 * - splice one subbuffer worth of data to a pipe
50 * - splice the data from pipe to disk/network
51 * - put_subbuf
f3bc08c5
MD
52 */
53
54#include <linux/delay.h>
55#include <linux/module.h>
56#include <linux/percpu.h>
57
c075712b
MD
58#include <wrapper/ringbuffer/config.h>
59#include <wrapper/ringbuffer/backend.h>
60#include <wrapper/ringbuffer/frontend.h>
61#include <wrapper/ringbuffer/iterator.h>
62#include <wrapper/ringbuffer/nohz.h>
63#include <wrapper/atomic.h>
64#include <wrapper/kref.h>
65#include <wrapper/percpu-defs.h>
f3bc08c5
MD
66
67/*
68 * Internal structure representing offsets to use at a sub-buffer switch.
69 */
70struct switch_offsets {
71 unsigned long begin, end, old;
72 size_t pre_header_padding, size;
f5ea5800
MD
73 unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
74 switch_old_end:1;
f3bc08c5
MD
75};
76
77#ifdef CONFIG_NO_HZ
78enum tick_nohz_val {
79 TICK_NOHZ_STOP,
80 TICK_NOHZ_FLUSH,
81 TICK_NOHZ_RESTART,
82};
83
84static ATOMIC_NOTIFIER_HEAD(tick_nohz_notifier);
85#endif /* CONFIG_NO_HZ */
86
87static DEFINE_PER_CPU(spinlock_t, ring_buffer_nohz_lock);
88
89DEFINE_PER_CPU(unsigned int, lib_ring_buffer_nesting);
90EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting);
91
92static
93void lib_ring_buffer_print_errors(struct channel *chan,
94 struct lib_ring_buffer *buf, int cpu);
95
96/*
97 * Must be called under cpu hotplug protection.
98 */
99void lib_ring_buffer_free(struct lib_ring_buffer *buf)
100{
101 struct channel *chan = buf->backend.chan;
102
103 lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
104 kfree(buf->commit_hot);
105 kfree(buf->commit_cold);
106
107 lib_ring_buffer_backend_free(&buf->backend);
108}
109
110/**
111 * lib_ring_buffer_reset - Reset ring buffer to initial values.
112 * @buf: Ring buffer.
113 *
114 * Effectively empty the ring buffer. Should be called when the buffer is not
115 * used for writing. The ring buffer can be opened for reading, but the reader
116 * should not be using the iterator concurrently with reset. The previous
117 * current iterator record is reset.
118 */
119void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
120{
121 struct channel *chan = buf->backend.chan;
5a8fd222 122 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
123 unsigned int i;
124
125 /*
126 * Reset iterator first. It will put the subbuffer if it currently holds
127 * it.
128 */
129 lib_ring_buffer_iterator_reset(buf);
130 v_set(config, &buf->offset, 0);
131 for (i = 0; i < chan->backend.num_subbuf; i++) {
132 v_set(config, &buf->commit_hot[i].cc, 0);
133 v_set(config, &buf->commit_hot[i].seq, 0);
134 v_set(config, &buf->commit_cold[i].cc_sb, 0);
135 }
136 atomic_long_set(&buf->consumed, 0);
137 atomic_set(&buf->record_disabled, 0);
138 v_set(config, &buf->last_tsc, 0);
139 lib_ring_buffer_backend_reset(&buf->backend);
140 /* Don't reset number of active readers */
141 v_set(config, &buf->records_lost_full, 0);
142 v_set(config, &buf->records_lost_wrap, 0);
143 v_set(config, &buf->records_lost_big, 0);
144 v_set(config, &buf->records_count, 0);
145 v_set(config, &buf->records_overrun, 0);
146 buf->finalized = 0;
147}
148EXPORT_SYMBOL_GPL(lib_ring_buffer_reset);
149
150/**
151 * channel_reset - Reset channel to initial values.
152 * @chan: Channel.
153 *
154 * Effectively empty the channel. Should be called when the channel is not used
155 * for writing. The channel can be opened for reading, but the reader should not
156 * be using the iterator concurrently with reset. The previous current iterator
157 * record is reset.
158 */
159void channel_reset(struct channel *chan)
160{
161 /*
162 * Reset iterators first. Will put the subbuffer if held for reading.
163 */
164 channel_iterator_reset(chan);
165 atomic_set(&chan->record_disabled, 0);
166 /* Don't reset commit_count_mask, still valid */
167 channel_backend_reset(&chan->backend);
168 /* Don't reset switch/read timer interval */
169 /* Don't reset notifiers and notifier enable bits */
170 /* Don't reset reader reference count */
171}
172EXPORT_SYMBOL_GPL(channel_reset);
173
174/*
175 * Must be called under cpu hotplug protection.
176 */
177int lib_ring_buffer_create(struct lib_ring_buffer *buf,
178 struct channel_backend *chanb, int cpu)
179{
5a8fd222 180 const struct lib_ring_buffer_config *config = &chanb->config;
f3bc08c5
MD
181 struct channel *chan = container_of(chanb, struct channel, backend);
182 void *priv = chanb->priv;
f3bc08c5
MD
183 size_t subbuf_header_size;
184 u64 tsc;
185 int ret;
186
187 /* Test for cpu hotplug */
188 if (buf->backend.allocated)
189 return 0;
190
191 /*
192 * Paranoia: per cpu dynamic allocation is not officially documented as
193 * zeroing the memory, so let's do it here too, just in case.
194 */
195 memset(buf, 0, sizeof(*buf));
196
197 ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, cpu);
198 if (ret)
199 return ret;
200
201 buf->commit_hot =
202 kzalloc_node(ALIGN(sizeof(*buf->commit_hot)
203 * chan->backend.num_subbuf,
204 1 << INTERNODE_CACHE_SHIFT),
205 GFP_KERNEL, cpu_to_node(max(cpu, 0)));
206 if (!buf->commit_hot) {
207 ret = -ENOMEM;
208 goto free_chanbuf;
209 }
210
211 buf->commit_cold =
212 kzalloc_node(ALIGN(sizeof(*buf->commit_cold)
213 * chan->backend.num_subbuf,
214 1 << INTERNODE_CACHE_SHIFT),
215 GFP_KERNEL, cpu_to_node(max(cpu, 0)));
216 if (!buf->commit_cold) {
217 ret = -ENOMEM;
218 goto free_commit;
219 }
220
f3bc08c5 221 init_waitqueue_head(&buf->read_wait);
71c1d843 222 init_waitqueue_head(&buf->write_wait);
f3bc08c5
MD
223 raw_spin_lock_init(&buf->raw_tick_nohz_spinlock);
224
225 /*
226 * Write the subbuffer header for first subbuffer so we know the total
227 * duration of data gathering.
228 */
229 subbuf_header_size = config->cb.subbuffer_header_size();
230 v_set(config, &buf->offset, subbuf_header_size);
231 subbuffer_id_clear_noref(config, &buf->backend.buf_wsb[0].id);
232 tsc = config->cb.ring_buffer_clock_read(buf->backend.chan);
233 config->cb.buffer_begin(buf, tsc, 0);
234 v_add(config, subbuf_header_size, &buf->commit_hot[0].cc);
235
236 if (config->cb.buffer_create) {
237 ret = config->cb.buffer_create(buf, priv, cpu, chanb->name);
238 if (ret)
239 goto free_init;
240 }
241
242 /*
243 * Ensure the buffer is ready before setting it to allocated and setting
244 * the cpumask.
245 * Used for cpu hotplug vs cpumask iteration.
246 */
247 smp_wmb();
248 buf->backend.allocated = 1;
249
250 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
251 CHAN_WARN_ON(chan, cpumask_test_cpu(cpu,
252 chan->backend.cpumask));
253 cpumask_set_cpu(cpu, chan->backend.cpumask);
254 }
255
256 return 0;
257
258 /* Error handling */
259free_init:
260 kfree(buf->commit_cold);
261free_commit:
262 kfree(buf->commit_hot);
263free_chanbuf:
264 lib_ring_buffer_backend_free(&buf->backend);
265 return ret;
266}
267
268static void switch_buffer_timer(unsigned long data)
269{
270 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
271 struct channel *chan = buf->backend.chan;
5a8fd222 272 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
273
274 /*
275 * Only flush buffers periodically if readers are active.
276 */
277 if (atomic_long_read(&buf->active_readers))
278 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
279
280 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
281 mod_timer_pinned(&buf->switch_timer,
282 jiffies + chan->switch_timer_interval);
283 else
284 mod_timer(&buf->switch_timer,
285 jiffies + chan->switch_timer_interval);
286}
287
288/*
289 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
290 */
291static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
292{
293 struct channel *chan = buf->backend.chan;
5a8fd222 294 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
295
296 if (!chan->switch_timer_interval || buf->switch_timer_enabled)
297 return;
298 init_timer(&buf->switch_timer);
299 buf->switch_timer.function = switch_buffer_timer;
300 buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
301 buf->switch_timer.data = (unsigned long)buf;
302 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
303 add_timer_on(&buf->switch_timer, buf->backend.cpu);
304 else
305 add_timer(&buf->switch_timer);
306 buf->switch_timer_enabled = 1;
307}
308
309/*
310 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
311 */
312static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
313{
314 struct channel *chan = buf->backend.chan;
315
316 if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
317 return;
318
319 del_timer_sync(&buf->switch_timer);
320 buf->switch_timer_enabled = 0;
321}
322
323/*
324 * Polling timer to check the channels for data.
325 */
326static void read_buffer_timer(unsigned long data)
327{
328 struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
329 struct channel *chan = buf->backend.chan;
5a8fd222 330 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
331
332 CHAN_WARN_ON(chan, !buf->backend.allocated);
333
334 if (atomic_long_read(&buf->active_readers)
335 && lib_ring_buffer_poll_deliver(config, buf, chan)) {
336 wake_up_interruptible(&buf->read_wait);
337 wake_up_interruptible(&chan->read_wait);
338 }
339
340 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
341 mod_timer_pinned(&buf->read_timer,
342 jiffies + chan->read_timer_interval);
343 else
344 mod_timer(&buf->read_timer,
345 jiffies + chan->read_timer_interval);
346}
347
348/*
349 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
350 */
351static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
352{
353 struct channel *chan = buf->backend.chan;
5a8fd222 354 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
355
356 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
357 || !chan->read_timer_interval
358 || buf->read_timer_enabled)
359 return;
360
361 init_timer(&buf->read_timer);
362 buf->read_timer.function = read_buffer_timer;
363 buf->read_timer.expires = jiffies + chan->read_timer_interval;
364 buf->read_timer.data = (unsigned long)buf;
365
366 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
367 add_timer_on(&buf->read_timer, buf->backend.cpu);
368 else
369 add_timer(&buf->read_timer);
370 buf->read_timer_enabled = 1;
371}
372
373/*
374 * Called with ring_buffer_nohz_lock held for per-cpu buffers.
375 */
376static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
377{
378 struct channel *chan = buf->backend.chan;
5a8fd222 379 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
380
381 if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
382 || !chan->read_timer_interval
383 || !buf->read_timer_enabled)
384 return;
385
386 del_timer_sync(&buf->read_timer);
387 /*
388 * do one more check to catch data that has been written in the last
389 * timer period.
390 */
391 if (lib_ring_buffer_poll_deliver(config, buf, chan)) {
392 wake_up_interruptible(&buf->read_wait);
393 wake_up_interruptible(&chan->read_wait);
394 }
395 buf->read_timer_enabled = 0;
396}
397
398#ifdef CONFIG_HOTPLUG_CPU
399/**
400 * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
401 * @nb: notifier block
402 * @action: hotplug action to take
403 * @hcpu: CPU number
404 *
405 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
406 */
407static
e8f071d5 408int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
f3bc08c5
MD
409 unsigned long action,
410 void *hcpu)
411{
412 unsigned int cpu = (unsigned long)hcpu;
413 struct channel *chan = container_of(nb, struct channel,
414 cpu_hp_notifier);
415 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
5a8fd222 416 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
417
418 if (!chan->cpu_hp_enable)
419 return NOTIFY_DONE;
420
421 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
422
423 switch (action) {
424 case CPU_DOWN_FAILED:
425 case CPU_DOWN_FAILED_FROZEN:
426 case CPU_ONLINE:
427 case CPU_ONLINE_FROZEN:
24cedcfe 428 wake_up_interruptible(&chan->hp_wait);
f3bc08c5
MD
429 lib_ring_buffer_start_switch_timer(buf);
430 lib_ring_buffer_start_read_timer(buf);
431 return NOTIFY_OK;
432
433 case CPU_DOWN_PREPARE:
434 case CPU_DOWN_PREPARE_FROZEN:
435 lib_ring_buffer_stop_switch_timer(buf);
436 lib_ring_buffer_stop_read_timer(buf);
437 return NOTIFY_OK;
438
439 case CPU_DEAD:
440 case CPU_DEAD_FROZEN:
441 /*
442 * Performing a buffer switch on a remote CPU. Performed by
443 * the CPU responsible for doing the hotunplug after the target
444 * CPU stopped running completely. Ensures that all data
445 * from that remote CPU is flushed.
446 */
447 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
448 return NOTIFY_OK;
449
450 default:
451 return NOTIFY_DONE;
452 }
453}
454#endif
455
23b908b0 456#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
f3bc08c5
MD
457/*
458 * For per-cpu buffers, call the reader wakeups before switching the buffer, so
459 * that wake-up-tracing generated events are flushed before going idle (in
460 * tick_nohz). We test if the spinlock is locked to deal with the race where
461 * readers try to sample the ring buffer before we perform the switch. We let
462 * the readers retry in that case. If there is data in the buffer, the wake up
463 * is going to forbid the CPU running the reader thread from going idle.
464 */
465static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb,
466 unsigned long val,
467 void *data)
468{
469 struct channel *chan = container_of(nb, struct channel,
470 tick_nohz_notifier);
5a8fd222 471 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
472 struct lib_ring_buffer *buf;
473 int cpu = smp_processor_id();
474
475 if (config->alloc != RING_BUFFER_ALLOC_PER_CPU) {
476 /*
477 * We don't support keeping the system idle with global buffers
478 * and streaming active. In order to do so, we would need to
479 * sample a non-nohz-cpumask racelessly with the nohz updates
480 * without adding synchronization overhead to nohz. Leave this
481 * use-case out for now.
482 */
483 return 0;
484 }
485
486 buf = channel_get_ring_buffer(config, chan, cpu);
487 switch (val) {
488 case TICK_NOHZ_FLUSH:
489 raw_spin_lock(&buf->raw_tick_nohz_spinlock);
490 if (config->wakeup == RING_BUFFER_WAKEUP_BY_TIMER
491 && chan->read_timer_interval
492 && atomic_long_read(&buf->active_readers)
493 && (lib_ring_buffer_poll_deliver(config, buf, chan)
494 || lib_ring_buffer_pending_data(config, buf, chan))) {
495 wake_up_interruptible(&buf->read_wait);
496 wake_up_interruptible(&chan->read_wait);
497 }
498 if (chan->switch_timer_interval)
499 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
500 raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
501 break;
502 case TICK_NOHZ_STOP:
e6b06d7d 503 spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
f3bc08c5
MD
504 lib_ring_buffer_stop_switch_timer(buf);
505 lib_ring_buffer_stop_read_timer(buf);
e6b06d7d 506 spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
f3bc08c5
MD
507 break;
508 case TICK_NOHZ_RESTART:
e6b06d7d 509 spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
f3bc08c5
MD
510 lib_ring_buffer_start_read_timer(buf);
511 lib_ring_buffer_start_switch_timer(buf);
e6b06d7d 512 spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
f3bc08c5
MD
513 break;
514 }
515
516 return 0;
517}
518
519void notrace lib_ring_buffer_tick_nohz_flush(void)
520{
521 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_FLUSH,
522 NULL);
523}
524
525void notrace lib_ring_buffer_tick_nohz_stop(void)
526{
527 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_STOP,
528 NULL);
529}
530
531void notrace lib_ring_buffer_tick_nohz_restart(void)
532{
533 atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_RESTART,
534 NULL);
535}
23b908b0 536#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
f3bc08c5
MD
537
538/*
539 * Holds CPU hotplug.
540 */
541static void channel_unregister_notifiers(struct channel *chan)
542{
5a8fd222 543 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
544 int cpu;
545
546 channel_iterator_unregister_notifiers(chan);
547 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
548#ifdef CONFIG_NO_HZ
549 /*
550 * Remove the nohz notifier first, so we are certain we stop
551 * the timers.
552 */
553 atomic_notifier_chain_unregister(&tick_nohz_notifier,
554 &chan->tick_nohz_notifier);
555 /*
556 * ring_buffer_nohz_lock will not be needed below, because
557 * we just removed the notifiers, which were the only source of
558 * concurrency.
559 */
560#endif /* CONFIG_NO_HZ */
561#ifdef CONFIG_HOTPLUG_CPU
562 get_online_cpus();
563 chan->cpu_hp_enable = 0;
564 for_each_online_cpu(cpu) {
565 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
566 cpu);
567 lib_ring_buffer_stop_switch_timer(buf);
568 lib_ring_buffer_stop_read_timer(buf);
569 }
570 put_online_cpus();
571 unregister_cpu_notifier(&chan->cpu_hp_notifier);
572#else
573 for_each_possible_cpu(cpu) {
574 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
575 cpu);
576 lib_ring_buffer_stop_switch_timer(buf);
577 lib_ring_buffer_stop_read_timer(buf);
578 }
579#endif
580 } else {
581 struct lib_ring_buffer *buf = chan->backend.buf;
582
583 lib_ring_buffer_stop_switch_timer(buf);
584 lib_ring_buffer_stop_read_timer(buf);
585 }
586 channel_backend_unregister_notifiers(&chan->backend);
587}
588
589static void channel_free(struct channel *chan)
590{
dd5a0db3
MD
591 if (chan->backend.release_priv_ops) {
592 chan->backend.release_priv_ops(chan->backend.priv_ops);
593 }
f3bc08c5
MD
594 channel_iterator_free(chan);
595 channel_backend_free(&chan->backend);
596 kfree(chan);
597}
598
599/**
600 * channel_create - Create channel.
601 * @config: ring buffer instance configuration
602 * @name: name of the channel
603 * @priv: ring buffer client private data
604 * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
605 * address mapping. It is used only by RING_BUFFER_STATIC
606 * configuration. It can be set to NULL for other backends.
607 * @subbuf_size: subbuffer size
608 * @num_subbuf: number of subbuffers
609 * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
610 * padding to let readers get those sub-buffers.
611 * Used for live streaming.
612 * @read_timer_interval: Time interval (in us) to wake up pending readers.
613 *
614 * Holds cpu hotplug.
615 * Returns NULL on failure.
616 */
617struct channel *channel_create(const struct lib_ring_buffer_config *config,
618 const char *name, void *priv, void *buf_addr,
619 size_t subbuf_size,
620 size_t num_subbuf, unsigned int switch_timer_interval,
621 unsigned int read_timer_interval)
622{
623 int ret, cpu;
624 struct channel *chan;
625
626 if (lib_ring_buffer_check_config(config, switch_timer_interval,
627 read_timer_interval))
628 return NULL;
629
630 chan = kzalloc(sizeof(struct channel), GFP_KERNEL);
631 if (!chan)
632 return NULL;
633
634 ret = channel_backend_init(&chan->backend, name, config, priv,
635 subbuf_size, num_subbuf);
636 if (ret)
637 goto error;
638
639 ret = channel_iterator_init(chan);
640 if (ret)
641 goto error_free_backend;
642
643 chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
644 chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
645 chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
f40270ad 646 kref_init(&chan->ref);
f3bc08c5 647 init_waitqueue_head(&chan->read_wait);
24cedcfe 648 init_waitqueue_head(&chan->hp_wait);
f3bc08c5
MD
649
650 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
23b908b0 651#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
f3bc08c5
MD
652 /* Only benefit from NO_HZ idle with per-cpu buffers for now. */
653 chan->tick_nohz_notifier.notifier_call =
654 ring_buffer_tick_nohz_callback;
655 chan->tick_nohz_notifier.priority = ~0U;
656 atomic_notifier_chain_register(&tick_nohz_notifier,
657 &chan->tick_nohz_notifier);
23b908b0 658#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
f3bc08c5
MD
659
660 /*
661 * In case of non-hotplug cpu, if the ring-buffer is allocated
662 * in early initcall, it will not be notified of secondary cpus.
663 * In that off case, we need to allocate for all possible cpus.
664 */
665#ifdef CONFIG_HOTPLUG_CPU
666 chan->cpu_hp_notifier.notifier_call =
667 lib_ring_buffer_cpu_hp_callback;
668 chan->cpu_hp_notifier.priority = 6;
669 register_cpu_notifier(&chan->cpu_hp_notifier);
670
671 get_online_cpus();
672 for_each_online_cpu(cpu) {
673 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
674 cpu);
675 spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
676 lib_ring_buffer_start_switch_timer(buf);
677 lib_ring_buffer_start_read_timer(buf);
678 spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
679 }
680 chan->cpu_hp_enable = 1;
681 put_online_cpus();
682#else
683 for_each_possible_cpu(cpu) {
684 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
685 cpu);
686 spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
687 lib_ring_buffer_start_switch_timer(buf);
688 lib_ring_buffer_start_read_timer(buf);
689 spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
690 }
691#endif
692 } else {
693 struct lib_ring_buffer *buf = chan->backend.buf;
694
695 lib_ring_buffer_start_switch_timer(buf);
696 lib_ring_buffer_start_read_timer(buf);
697 }
698
699 return chan;
700
701error_free_backend:
702 channel_backend_free(&chan->backend);
703error:
704 kfree(chan);
705 return NULL;
706}
707EXPORT_SYMBOL_GPL(channel_create);
708
f40270ad
MD
709static
710void channel_release(struct kref *kref)
711{
712 struct channel *chan = container_of(kref, struct channel, ref);
713 channel_free(chan);
714}
715
f3bc08c5
MD
716/**
717 * channel_destroy - Finalize, wait for q.s. and destroy channel.
718 * @chan: channel to destroy
719 *
720 * Holds cpu hotplug.
9a0df743
MD
721 * Call "destroy" callback, finalize channels, and then decrement the
722 * channel reference count. Note that when readers have completed data
723 * consumption of finalized channels, get_subbuf() will return -ENODATA.
724 * They should release their handle at that point. Returns the private
725 * data pointer.
f3bc08c5
MD
726 */
727void *channel_destroy(struct channel *chan)
728{
729 int cpu;
5a8fd222 730 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
731 void *priv;
732
733 channel_unregister_notifiers(chan);
734
735 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
736 /*
737 * No need to hold cpu hotplug, because all notifiers have been
738 * unregistered.
739 */
740 for_each_channel_cpu(cpu, chan) {
741 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
742 cpu);
743
744 if (config->cb.buffer_finalize)
745 config->cb.buffer_finalize(buf,
746 chan->backend.priv,
747 cpu);
748 if (buf->backend.allocated)
749 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
750 /*
751 * Perform flush before writing to finalized.
752 */
753 smp_wmb();
754 ACCESS_ONCE(buf->finalized) = 1;
755 wake_up_interruptible(&buf->read_wait);
756 }
757 } else {
758 struct lib_ring_buffer *buf = chan->backend.buf;
759
760 if (config->cb.buffer_finalize)
761 config->cb.buffer_finalize(buf, chan->backend.priv, -1);
762 if (buf->backend.allocated)
763 lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
764 /*
765 * Perform flush before writing to finalized.
766 */
767 smp_wmb();
768 ACCESS_ONCE(buf->finalized) = 1;
769 wake_up_interruptible(&buf->read_wait);
770 }
24cedcfe
MD
771 ACCESS_ONCE(chan->finalized) = 1;
772 wake_up_interruptible(&chan->hp_wait);
f3bc08c5 773 wake_up_interruptible(&chan->read_wait);
f3bc08c5 774 priv = chan->backend.priv;
ba1d61bc 775 kref_put(&chan->ref, channel_release);
f3bc08c5
MD
776 return priv;
777}
778EXPORT_SYMBOL_GPL(channel_destroy);
779
780struct lib_ring_buffer *channel_get_ring_buffer(
781 const struct lib_ring_buffer_config *config,
782 struct channel *chan, int cpu)
783{
784 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL)
785 return chan->backend.buf;
786 else
787 return per_cpu_ptr(chan->backend.buf, cpu);
788}
789EXPORT_SYMBOL_GPL(channel_get_ring_buffer);
790
791int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
792{
793 struct channel *chan = buf->backend.chan;
794
795 if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
796 return -EBUSY;
9c1f4643
MD
797 if (!lttng_kref_get(&chan->ref)) {
798 atomic_long_dec(&buf->active_readers);
799 return -EOVERFLOW;
800 }
505fb410 801 lttng_smp_mb__after_atomic();
f3bc08c5
MD
802 return 0;
803}
804EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
805
806void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
807{
808 struct channel *chan = buf->backend.chan;
809
810 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
505fb410 811 lttng_smp_mb__before_atomic();
f3bc08c5 812 atomic_long_dec(&buf->active_readers);
f40270ad 813 kref_put(&chan->ref, channel_release);
f3bc08c5
MD
814}
815EXPORT_SYMBOL_GPL(lib_ring_buffer_release_read);
816
817/*
818 * Promote compiler barrier to a smp_mb().
819 * For the specific ring buffer case, this IPI call should be removed if the
820 * architecture does not reorder writes. This should eventually be provided by
821 * a separate architecture-specific infrastructure.
822 */
823static void remote_mb(void *info)
824{
825 smp_mb();
826}
827
828/**
829 * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
830 * @buf: ring buffer
831 * @consumed: consumed count indicating the position where to read
832 * @produced: produced count, indicates position when to stop reading
833 *
834 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
835 * data to read at consumed position, or 0 if the get operation succeeds.
836 * Busy-loop trying to get data if the tick_nohz sequence lock is held.
837 */
838
839int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
840 unsigned long *consumed, unsigned long *produced)
841{
842 struct channel *chan = buf->backend.chan;
5a8fd222 843 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
844 unsigned long consumed_cur, write_offset;
845 int finalized;
846
847retry:
848 finalized = ACCESS_ONCE(buf->finalized);
849 /*
850 * Read finalized before counters.
851 */
852 smp_rmb();
853 consumed_cur = atomic_long_read(&buf->consumed);
854 /*
855 * No need to issue a memory barrier between consumed count read and
856 * write offset read, because consumed count can only change
857 * concurrently in overwrite mode, and we keep a sequence counter
858 * identifier derived from the write offset to check we are getting
859 * the same sub-buffer we are expecting (the sub-buffers are atomically
860 * "tagged" upon writes, tags are checked upon read).
861 */
862 write_offset = v_read(config, &buf->offset);
863
864 /*
865 * Check that we are not about to read the same subbuffer in
866 * which the writer head is.
867 */
868 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
869 == 0)
870 goto nodata;
871
872 *consumed = consumed_cur;
873 *produced = subbuf_trunc(write_offset, chan);
874
875 return 0;
876
877nodata:
878 /*
879 * The memory barriers __wait_event()/wake_up_interruptible() take care
880 * of "raw_spin_is_locked" memory ordering.
881 */
882 if (finalized)
883 return -ENODATA;
884 else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
885 goto retry;
886 else
887 return -EAGAIN;
888}
889EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot);
890
891/**
892 * lib_ring_buffer_put_snapshot - move consumed counter forward
71c1d843
MD
893 *
894 * Should only be called from consumer context.
f3bc08c5
MD
895 * @buf: ring buffer
896 * @consumed_new: new consumed count value
897 */
898void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
899 unsigned long consumed_new)
900{
901 struct lib_ring_buffer_backend *bufb = &buf->backend;
902 struct channel *chan = bufb->chan;
903 unsigned long consumed;
904
905 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
906
907 /*
908 * Only push the consumed value forward.
909 * If the consumed cmpxchg fails, this is because we have been pushed by
910 * the writer in flight recorder mode.
911 */
912 consumed = atomic_long_read(&buf->consumed);
913 while ((long) consumed - (long) consumed_new < 0)
914 consumed = atomic_long_cmpxchg(&buf->consumed, consumed,
915 consumed_new);
71c1d843
MD
916 /* Wake-up the metadata producer */
917 wake_up_interruptible(&buf->write_wait);
f3bc08c5
MD
918}
919EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer);
920
921/**
922 * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
923 * @buf: ring buffer
924 * @consumed: consumed count indicating the position where to read
925 *
926 * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
927 * data to read at consumed position, or 0 if the get operation succeeds.
928 * Busy-loop trying to get data if the tick_nohz sequence lock is held.
929 */
930int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
931 unsigned long consumed)
932{
933 struct channel *chan = buf->backend.chan;
5a8fd222 934 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
935 unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
936 int ret;
937 int finalized;
938
8202b8a0
MD
939 if (buf->get_subbuf) {
940 /*
941 * Reader is trying to get a subbuffer twice.
942 */
943 CHAN_WARN_ON(chan, 1);
944 return -EBUSY;
945 }
f3bc08c5
MD
946retry:
947 finalized = ACCESS_ONCE(buf->finalized);
948 /*
949 * Read finalized before counters.
950 */
951 smp_rmb();
952 consumed_cur = atomic_long_read(&buf->consumed);
953 consumed_idx = subbuf_index(consumed, chan);
954 commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb);
955 /*
956 * Make sure we read the commit count before reading the buffer
957 * data and the write offset. Correct consumed offset ordering
958 * wrt commit count is insured by the use of cmpxchg to update
959 * the consumed offset.
960 * smp_call_function_single can fail if the remote CPU is offline,
961 * this is OK because then there is no wmb to execute there.
962 * If our thread is executing on the same CPU as the on the buffers
963 * belongs to, we don't have to synchronize it at all. If we are
964 * migrated, the scheduler will take care of the memory barriers.
965 * Normally, smp_call_function_single() should ensure program order when
966 * executing the remote function, which implies that it surrounds the
967 * function execution with :
968 * smp_mb()
969 * send IPI
970 * csd_lock_wait
971 * recv IPI
972 * smp_mb()
973 * exec. function
974 * smp_mb()
975 * csd unlock
976 * smp_mb()
977 *
978 * However, smp_call_function_single() does not seem to clearly execute
979 * such barriers. It depends on spinlock semantic to provide the barrier
980 * before executing the IPI and, when busy-looping, csd_lock_wait only
981 * executes smp_mb() when it has to wait for the other CPU.
982 *
983 * I don't trust this code. Therefore, let's add the smp_mb() sequence
984 * required ourself, even if duplicated. It has no performance impact
985 * anyway.
986 *
987 * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
988 * read and write vs write. They do not ensure core synchronization. We
989 * really have to ensure total order between the 3 barriers running on
990 * the 2 CPUs.
991 */
992 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
993 if (config->sync == RING_BUFFER_SYNC_PER_CPU
994 && config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
995 if (raw_smp_processor_id() != buf->backend.cpu) {
996 /* Total order with IPI handler smp_mb() */
997 smp_mb();
998 smp_call_function_single(buf->backend.cpu,
999 remote_mb, NULL, 1);
1000 /* Total order with IPI handler smp_mb() */
1001 smp_mb();
1002 }
1003 } else {
1004 /* Total order with IPI handler smp_mb() */
1005 smp_mb();
1006 smp_call_function(remote_mb, NULL, 1);
1007 /* Total order with IPI handler smp_mb() */
1008 smp_mb();
1009 }
1010 } else {
1011 /*
1012 * Local rmb to match the remote wmb to read the commit count
1013 * before the buffer data and the write offset.
1014 */
1015 smp_rmb();
1016 }
1017
1018 write_offset = v_read(config, &buf->offset);
1019
1020 /*
1021 * Check that the buffer we are getting is after or at consumed_cur
1022 * position.
1023 */
1024 if ((long) subbuf_trunc(consumed, chan)
1025 - (long) subbuf_trunc(consumed_cur, chan) < 0)
1026 goto nodata;
1027
1028 /*
1029 * Check that the subbuffer we are trying to consume has been
1030 * already fully committed.
1031 */
1032 if (((commit_count - chan->backend.subbuf_size)
1033 & chan->commit_count_mask)
c9b3b5e2 1034 - (buf_trunc(consumed, chan)
f3bc08c5
MD
1035 >> chan->backend.num_subbuf_order)
1036 != 0)
1037 goto nodata;
1038
1039 /*
1040 * Check that we are not about to read the same subbuffer in
1041 * which the writer head is.
1042 */
c9b3b5e2 1043 if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed, chan)
f3bc08c5
MD
1044 == 0)
1045 goto nodata;
1046
1047 /*
1048 * Failure to get the subbuffer causes a busy-loop retry without going
1049 * to a wait queue. These are caused by short-lived race windows where
1050 * the writer is getting access to a subbuffer we were trying to get
1051 * access to. Also checks that the "consumed" buffer count we are
1052 * looking for matches the one contained in the subbuffer id.
1053 */
1054 ret = update_read_sb_index(config, &buf->backend, &chan->backend,
1055 consumed_idx, buf_trunc_val(consumed, chan));
1056 if (ret)
1057 goto retry;
1058 subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
1059
1060 buf->get_subbuf_consumed = consumed;
1061 buf->get_subbuf = 1;
1062
1063 return 0;
1064
1065nodata:
1066 /*
1067 * The memory barriers __wait_event()/wake_up_interruptible() take care
1068 * of "raw_spin_is_locked" memory ordering.
1069 */
1070 if (finalized)
1071 return -ENODATA;
1072 else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
1073 goto retry;
1074 else
1075 return -EAGAIN;
1076}
1077EXPORT_SYMBOL_GPL(lib_ring_buffer_get_subbuf);
1078
1079/**
1080 * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
1081 * @buf: ring buffer
1082 */
1083void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
1084{
1085 struct lib_ring_buffer_backend *bufb = &buf->backend;
1086 struct channel *chan = bufb->chan;
5a8fd222 1087 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1088 unsigned long read_sb_bindex, consumed_idx, consumed;
1089
1090 CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
1091
1092 if (!buf->get_subbuf) {
1093 /*
1094 * Reader puts a subbuffer it did not get.
1095 */
1096 CHAN_WARN_ON(chan, 1);
1097 return;
1098 }
1099 consumed = buf->get_subbuf_consumed;
1100 buf->get_subbuf = 0;
1101
1102 /*
1103 * Clear the records_unread counter. (overruns counter)
1104 * Can still be non-zero if a file reader simply grabbed the data
1105 * without using iterators.
1106 * Can be below zero if an iterator is used on a snapshot more than
1107 * once.
1108 */
1109 read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
1110 v_add(config, v_read(config,
1111 &bufb->array[read_sb_bindex]->records_unread),
1112 &bufb->records_read);
1113 v_set(config, &bufb->array[read_sb_bindex]->records_unread, 0);
1114 CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
1115 && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
1116 subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
1117
1118 /*
1119 * Exchange the reader subbuffer with the one we put in its place in the
1120 * writer subbuffer table. Expect the original consumed count. If
1121 * update_read_sb_index fails, this is because the writer updated the
1122 * subbuffer concurrently. We should therefore keep the subbuffer we
1123 * currently have: it has become invalid to try reading this sub-buffer
1124 * consumed count value anyway.
1125 */
1126 consumed_idx = subbuf_index(consumed, chan);
1127 update_read_sb_index(config, &buf->backend, &chan->backend,
1128 consumed_idx, buf_trunc_val(consumed, chan));
1129 /*
1130 * update_read_sb_index return value ignored. Don't exchange sub-buffer
1131 * if the writer concurrently updated it.
1132 */
1133}
1134EXPORT_SYMBOL_GPL(lib_ring_buffer_put_subbuf);
1135
1136/*
1137 * cons_offset is an iterator on all subbuffer offsets between the reader
1138 * position and the writer position. (inclusive)
1139 */
1140static
1141void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
1142 struct channel *chan,
1143 unsigned long cons_offset,
1144 int cpu)
1145{
5a8fd222 1146 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1147 unsigned long cons_idx, commit_count, commit_count_sb;
1148
1149 cons_idx = subbuf_index(cons_offset, chan);
1150 commit_count = v_read(config, &buf->commit_hot[cons_idx].cc);
1151 commit_count_sb = v_read(config, &buf->commit_cold[cons_idx].cc_sb);
1152
1153 if (subbuf_offset(commit_count, chan) != 0)
1154 printk(KERN_WARNING
1155 "ring buffer %s, cpu %d: "
1156 "commit count in subbuffer %lu,\n"
1157 "expecting multiples of %lu bytes\n"
1158 " [ %lu bytes committed, %lu bytes reader-visible ]\n",
1159 chan->backend.name, cpu, cons_idx,
1160 chan->backend.subbuf_size,
1161 commit_count, commit_count_sb);
1162
1163 printk(KERN_DEBUG "ring buffer: %s, cpu %d: %lu bytes committed\n",
1164 chan->backend.name, cpu, commit_count);
1165}
1166
1167static
1168void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
1169 struct channel *chan,
1170 void *priv, int cpu)
1171{
5a8fd222 1172 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1173 unsigned long write_offset, cons_offset;
1174
f3bc08c5
MD
1175 /*
1176 * No need to order commit_count, write_offset and cons_offset reads
1177 * because we execute at teardown when no more writer nor reader
1178 * references are left.
1179 */
1180 write_offset = v_read(config, &buf->offset);
1181 cons_offset = atomic_long_read(&buf->consumed);
1182 if (write_offset != cons_offset)
05aad775 1183 printk(KERN_DEBUG
f3bc08c5
MD
1184 "ring buffer %s, cpu %d: "
1185 "non-consumed data\n"
1186 " [ %lu bytes written, %lu bytes read ]\n",
1187 chan->backend.name, cpu, write_offset, cons_offset);
1188
1189 for (cons_offset = atomic_long_read(&buf->consumed);
1190 (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
1191 chan)
1192 - cons_offset) > 0;
1193 cons_offset = subbuf_align(cons_offset, chan))
1194 lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
1195 cpu);
1196}
1197
1198static
1199void lib_ring_buffer_print_errors(struct channel *chan,
1200 struct lib_ring_buffer *buf, int cpu)
1201{
5a8fd222 1202 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1203 void *priv = chan->backend.priv;
1204
ec01ec93
MD
1205 if (!strcmp(chan->backend.name, "relay-metadata")) {
1206 printk(KERN_DEBUG "ring buffer %s: %lu records written, "
1207 "%lu records overrun\n",
1208 chan->backend.name,
1209 v_read(config, &buf->records_count),
1210 v_read(config, &buf->records_overrun));
1211 } else {
1212 printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, "
1213 "%lu records overrun\n",
1214 chan->backend.name, cpu,
1215 v_read(config, &buf->records_count),
1216 v_read(config, &buf->records_overrun));
1217
1218 if (v_read(config, &buf->records_lost_full)
1219 || v_read(config, &buf->records_lost_wrap)
1220 || v_read(config, &buf->records_lost_big))
1221 printk(KERN_WARNING
1222 "ring buffer %s, cpu %d: records were lost. Caused by:\n"
1223 " [ %lu buffer full, %lu nest buffer wrap-around, "
1224 "%lu event too big ]\n",
1225 chan->backend.name, cpu,
1226 v_read(config, &buf->records_lost_full),
1227 v_read(config, &buf->records_lost_wrap),
1228 v_read(config, &buf->records_lost_big));
1229 }
f3bc08c5
MD
1230 lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu);
1231}
1232
1233/*
1234 * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
1235 *
1236 * Only executed when the buffer is finalized, in SWITCH_FLUSH.
1237 */
1238static
1239void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
1240 struct channel *chan,
1241 struct switch_offsets *offsets,
1242 u64 tsc)
1243{
5a8fd222 1244 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1245 unsigned long oldidx = subbuf_index(offsets->old, chan);
1246 unsigned long commit_count;
1247
1248 config->cb.buffer_begin(buf, tsc, oldidx);
1249
1250 /*
1251 * Order all writes to buffer before the commit count update that will
1252 * determine that the subbuffer is full.
1253 */
1254 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1255 /*
1256 * Must write slot data before incrementing commit count. This
1257 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1258 * by get_subbuf().
1259 */
1260 barrier();
1261 } else
1262 smp_wmb();
1263 v_add(config, config->cb.subbuffer_header_size(),
1264 &buf->commit_hot[oldidx].cc);
1265 commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
1266 /* Check if the written buffer has to be delivered */
1267 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
635e457c 1268 commit_count, oldidx, tsc);
f3bc08c5 1269 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
7915e163
MD
1270 offsets->old + config->cb.subbuffer_header_size(),
1271 commit_count);
f3bc08c5
MD
1272}
1273
1274/*
1275 * lib_ring_buffer_switch_old_end: switch old subbuffer
1276 *
1277 * Note : offset_old should never be 0 here. It is ok, because we never perform
1278 * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
1279 * increments the offset_old value when doing a SWITCH_FLUSH on an empty
1280 * subbuffer.
1281 */
1282static
1283void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
1284 struct channel *chan,
1285 struct switch_offsets *offsets,
1286 u64 tsc)
1287{
5a8fd222 1288 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1289 unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
1290 unsigned long commit_count, padding_size, data_size;
1291
1292 data_size = subbuf_offset(offsets->old - 1, chan) + 1;
1293 padding_size = chan->backend.subbuf_size - data_size;
1294 subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
1295
1296 /*
1297 * Order all writes to buffer before the commit count update that will
1298 * determine that the subbuffer is full.
1299 */
1300 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1301 /*
1302 * Must write slot data before incrementing commit count. This
1303 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1304 * by get_subbuf().
1305 */
1306 barrier();
1307 } else
1308 smp_wmb();
1309 v_add(config, padding_size, &buf->commit_hot[oldidx].cc);
1310 commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
1311 lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
635e457c 1312 commit_count, oldidx, tsc);
f3bc08c5 1313 lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
7915e163 1314 offsets->old + padding_size, commit_count);
f3bc08c5
MD
1315}
1316
1317/*
1318 * lib_ring_buffer_switch_new_start: Populate new subbuffer.
1319 *
1320 * This code can be executed unordered : writers may already have written to the
1321 * sub-buffer before this code gets executed, caution. The commit makes sure
1322 * that this code is executed before the deliver of this sub-buffer.
1323 */
1324static
1325void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
1326 struct channel *chan,
1327 struct switch_offsets *offsets,
1328 u64 tsc)
1329{
5a8fd222 1330 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1331 unsigned long beginidx = subbuf_index(offsets->begin, chan);
1332 unsigned long commit_count;
1333
1334 config->cb.buffer_begin(buf, tsc, beginidx);
1335
1336 /*
1337 * Order all writes to buffer before the commit count update that will
1338 * determine that the subbuffer is full.
1339 */
1340 if (config->ipi == RING_BUFFER_IPI_BARRIER) {
1341 /*
1342 * Must write slot data before incrementing commit count. This
1343 * compiler barrier is upgraded into a smp_mb() by the IPI sent
1344 * by get_subbuf().
1345 */
1346 barrier();
1347 } else
1348 smp_wmb();
1349 v_add(config, config->cb.subbuffer_header_size(),
1350 &buf->commit_hot[beginidx].cc);
1351 commit_count = v_read(config, &buf->commit_hot[beginidx].cc);
1352 /* Check if the written buffer has to be delivered */
1353 lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
635e457c 1354 commit_count, beginidx, tsc);
f3bc08c5 1355 lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
7915e163
MD
1356 offsets->begin + config->cb.subbuffer_header_size(),
1357 commit_count);
f3bc08c5
MD
1358}
1359
f5ea5800
MD
1360/*
1361 * lib_ring_buffer_switch_new_end: finish switching current subbuffer
1362 *
768b05c9
MD
1363 * Calls subbuffer_set_data_size() to set the data size of the current
1364 * sub-buffer. We do not need to perform check_deliver nor commit here,
1365 * since this task will be done by the "commit" of the event for which
1366 * we are currently doing the space reservation.
f5ea5800
MD
1367 */
1368static
1369void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
1370 struct channel *chan,
1371 struct switch_offsets *offsets,
1372 u64 tsc)
1373{
1374 const struct lib_ring_buffer_config *config = &chan->backend.config;
768b05c9 1375 unsigned long endidx, data_size;
f5ea5800 1376
768b05c9 1377 endidx = subbuf_index(offsets->end - 1, chan);
f5ea5800 1378 data_size = subbuf_offset(offsets->end - 1, chan) + 1;
f5ea5800 1379 subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
f5ea5800
MD
1380}
1381
f3bc08c5
MD
1382/*
1383 * Returns :
1384 * 0 if ok
1385 * !0 if execution must be aborted.
1386 */
1387static
1388int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
1389 struct lib_ring_buffer *buf,
1390 struct channel *chan,
1391 struct switch_offsets *offsets,
1392 u64 *tsc)
1393{
5a8fd222 1394 const struct lib_ring_buffer_config *config = &chan->backend.config;
5334a2c5 1395 unsigned long off, reserve_commit_diff;
f3bc08c5
MD
1396
1397 offsets->begin = v_read(config, &buf->offset);
1398 offsets->old = offsets->begin;
1399 offsets->switch_old_start = 0;
1400 off = subbuf_offset(offsets->begin, chan);
1401
1402 *tsc = config->cb.ring_buffer_clock_read(chan);
1403
1404 /*
1405 * Ensure we flush the header of an empty subbuffer when doing the
1406 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
1407 * total data gathering duration even if there were no records saved
1408 * after the last buffer switch.
1409 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
1410 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
1411 * subbuffer header as appropriate.
1412 * The next record that reserves space will be responsible for
1413 * populating the following subbuffer header. We choose not to populate
1414 * the next subbuffer header here because we want to be able to use
1415 * SWITCH_ACTIVE for periodical buffer flush and CPU tick_nohz stop
1416 * buffer flush, which must guarantee that all the buffer content
1417 * (records and header timestamps) are visible to the reader. This is
1418 * required for quiescence guarantees for the fusion merge.
1419 */
5334a2c5
MD
1420 if (mode != SWITCH_FLUSH && !off)
1421 return -1; /* we do not have to switch : buffer is empty */
1422
1423 if (unlikely(off == 0)) {
1424 unsigned long sb_index, commit_count;
1425
1426 /*
1427 * We are performing a SWITCH_FLUSH. At this stage, there are no
1428 * concurrent writes into the buffer.
1429 *
1430 * The client does not save any header information. Don't
1431 * switch empty subbuffer on finalize, because it is invalid to
1432 * deliver a completely empty subbuffer.
1433 */
1434 if (!config->cb.subbuffer_header_size())
1435 return -1;
1436
1437 /* Test new buffer integrity */
1438 sb_index = subbuf_index(offsets->begin, chan);
1439 commit_count = v_read(config,
1440 &buf->commit_cold[sb_index].cc_sb);
1441 reserve_commit_diff =
1442 (buf_trunc(offsets->begin, chan)
1443 >> chan->backend.num_subbuf_order)
1444 - (commit_count & chan->commit_count_mask);
1445 if (likely(reserve_commit_diff == 0)) {
1446 /* Next subbuffer not being written to. */
1447 if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1448 subbuf_trunc(offsets->begin, chan)
1449 - subbuf_trunc((unsigned long)
1450 atomic_long_read(&buf->consumed), chan)
1451 >= chan->backend.buf_size)) {
1452 /*
1453 * We do not overwrite non consumed buffers
1454 * and we are full : don't switch.
1455 */
f3bc08c5 1456 return -1;
5334a2c5
MD
1457 } else {
1458 /*
1459 * Next subbuffer not being written to, and we
1460 * are either in overwrite mode or the buffer is
1461 * not full. It's safe to write in this new
1462 * subbuffer.
1463 */
1464 }
1465 } else {
f3bc08c5 1466 /*
5334a2c5
MD
1467 * Next subbuffer reserve offset does not match the
1468 * commit offset. Don't perform switch in
1469 * producer-consumer and overwrite mode. Caused by
1470 * either a writer OOPS or too many nested writes over a
1471 * reserve/commit pair.
f3bc08c5 1472 */
5334a2c5 1473 return -1;
f3bc08c5 1474 }
5334a2c5
MD
1475
1476 /*
1477 * Need to write the subbuffer start header on finalize.
1478 */
1479 offsets->switch_old_start = 1;
1480 }
1481 offsets->begin = subbuf_align(offsets->begin, chan);
f3bc08c5
MD
1482 /* Note: old points to the next subbuf at offset 0 */
1483 offsets->end = offsets->begin;
1484 return 0;
1485}
1486
1487/*
1488 * Force a sub-buffer switch. This operation is completely reentrant : can be
1489 * called while tracing is active with absolutely no lock held.
1490 *
1491 * Note, however, that as a v_cmpxchg is used for some atomic
1492 * operations, this function must be called from the CPU which owns the buffer
1493 * for a ACTIVE flush.
1494 */
1495void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode)
1496{
1497 struct channel *chan = buf->backend.chan;
5a8fd222 1498 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1499 struct switch_offsets offsets;
1500 unsigned long oldidx;
1501 u64 tsc;
1502
1503 offsets.size = 0;
1504
1505 /*
1506 * Perform retryable operations.
1507 */
1508 do {
1509 if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
1510 &tsc))
1511 return; /* Switch not needed */
1512 } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
1513 != offsets.old);
1514
1515 /*
1516 * Atomically update last_tsc. This update races against concurrent
1517 * atomic updates, but the race will always cause supplementary full TSC
1518 * records, never the opposite (missing a full TSC record when it would
1519 * be needed).
1520 */
1521 save_last_tsc(config, buf, tsc);
1522
1523 /*
1524 * Push the reader if necessary
1525 */
1526 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
1527
1528 oldidx = subbuf_index(offsets.old, chan);
1529 lib_ring_buffer_clear_noref(config, &buf->backend, oldidx);
1530
1531 /*
1532 * May need to populate header start on SWITCH_FLUSH.
1533 */
1534 if (offsets.switch_old_start) {
1535 lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
1536 offsets.old += config->cb.subbuffer_header_size();
1537 }
1538
1539 /*
1540 * Switch old subbuffer.
1541 */
1542 lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
1543}
1544EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
1545
5e391252
MD
1546static void remote_switch(void *info)
1547{
1548 struct lib_ring_buffer *buf = info;
1549
1550 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
1551}
1552
1553void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
1554{
1555 struct channel *chan = buf->backend.chan;
1556 const struct lib_ring_buffer_config *config = &chan->backend.config;
1557 int ret;
1558
1559 /*
1560 * With global synchronization we don't need to use the IPI scheme.
1561 */
1562 if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
1563 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
1564 return;
1565 }
1566
1567 /*
1568 * Taking lock on CPU hotplug to ensure two things: first, that the
1569 * target cpu is not taken concurrently offline while we are within
1570 * smp_call_function_single() (I don't trust that get_cpu() on the
1571 * _local_ CPU actually inhibit CPU hotplug for the _remote_ CPU (to be
1572 * confirmed)). Secondly, if it happens that the CPU is not online, our
1573 * own call to lib_ring_buffer_switch_slow() needs to be protected from
1574 * CPU hotplug handlers, which can also perform a remote subbuffer
1575 * switch.
1576 */
1577 get_online_cpus();
1578 ret = smp_call_function_single(buf->backend.cpu,
1579 remote_switch, buf, 1);
1580 if (ret) {
1581 /* Remote CPU is offline, do it ourself. */
1582 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
1583 }
1584 put_online_cpus();
1585}
1586EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
1587
f3bc08c5
MD
1588/*
1589 * Returns :
1590 * 0 if ok
97ca2c54
MD
1591 * -ENOSPC if event size is too large for packet.
1592 * -ENOBUFS if there is currently not enough space in buffer for the event.
1593 * -EIO if data cannot be written into the buffer for any other reason.
f3bc08c5
MD
1594 */
1595static
1596int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
1597 struct channel *chan,
1598 struct switch_offsets *offsets,
1599 struct lib_ring_buffer_ctx *ctx)
1600{
5a8fd222 1601 const struct lib_ring_buffer_config *config = &chan->backend.config;
0fdec686 1602 unsigned long reserve_commit_diff, offset_cmp;
f3bc08c5 1603
0fdec686
MD
1604retry:
1605 offsets->begin = offset_cmp = v_read(config, &buf->offset);
f3bc08c5
MD
1606 offsets->old = offsets->begin;
1607 offsets->switch_new_start = 0;
f5ea5800 1608 offsets->switch_new_end = 0;
f3bc08c5
MD
1609 offsets->switch_old_end = 0;
1610 offsets->pre_header_padding = 0;
1611
1612 ctx->tsc = config->cb.ring_buffer_clock_read(chan);
97ca2c54
MD
1613 if ((int64_t) ctx->tsc == -EIO)
1614 return -EIO;
f3bc08c5
MD
1615
1616 if (last_tsc_overflow(config, buf, ctx->tsc))
64c796d8 1617 ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
f3bc08c5
MD
1618
1619 if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
1620 offsets->switch_new_start = 1; /* For offsets->begin */
1621 } else {
1622 offsets->size = config->cb.record_header_size(config, chan,
1623 offsets->begin,
f3bc08c5 1624 &offsets->pre_header_padding,
64c796d8 1625 ctx);
f3bc08c5
MD
1626 offsets->size +=
1627 lib_ring_buffer_align(offsets->begin + offsets->size,
1628 ctx->largest_align)
1629 + ctx->data_size;
1630 if (unlikely(subbuf_offset(offsets->begin, chan) +
1631 offsets->size > chan->backend.subbuf_size)) {
1632 offsets->switch_old_end = 1; /* For offsets->old */
1633 offsets->switch_new_start = 1; /* For offsets->begin */
1634 }
1635 }
1636 if (unlikely(offsets->switch_new_start)) {
0fdec686 1637 unsigned long sb_index, commit_count;
f3bc08c5
MD
1638
1639 /*
1640 * We are typically not filling the previous buffer completely.
1641 */
1642 if (likely(offsets->switch_old_end))
1643 offsets->begin = subbuf_align(offsets->begin, chan);
1644 offsets->begin = offsets->begin
1645 + config->cb.subbuffer_header_size();
1646 /* Test new buffer integrity */
1647 sb_index = subbuf_index(offsets->begin, chan);
0fdec686
MD
1648 /*
1649 * Read buf->offset before buf->commit_cold[sb_index].cc_sb.
1650 * lib_ring_buffer_check_deliver() has the matching
1651 * memory barriers required around commit_cold cc_sb
1652 * updates to ensure reserve and commit counter updates
1653 * are not seen reordered when updated by another CPU.
1654 */
1655 smp_rmb();
1656 commit_count = v_read(config,
1657 &buf->commit_cold[sb_index].cc_sb);
1658 /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */
1659 smp_rmb();
1660 if (unlikely(offset_cmp != v_read(config, &buf->offset))) {
1661 /*
1662 * The reserve counter have been concurrently updated
1663 * while we read the commit counter. This means the
1664 * commit counter we read might not match buf->offset
1665 * due to concurrent update. We therefore need to retry.
1666 */
1667 goto retry;
1668 }
f3bc08c5
MD
1669 reserve_commit_diff =
1670 (buf_trunc(offsets->begin, chan)
1671 >> chan->backend.num_subbuf_order)
0fdec686 1672 - (commit_count & chan->commit_count_mask);
f3bc08c5
MD
1673 if (likely(reserve_commit_diff == 0)) {
1674 /* Next subbuffer not being written to. */
1675 if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
1676 subbuf_trunc(offsets->begin, chan)
1677 - subbuf_trunc((unsigned long)
1678 atomic_long_read(&buf->consumed), chan)
1679 >= chan->backend.buf_size)) {
1680 /*
1681 * We do not overwrite non consumed buffers
1682 * and we are full : record is lost.
1683 */
1684 v_inc(config, &buf->records_lost_full);
97ca2c54 1685 return -ENOBUFS;
f3bc08c5
MD
1686 } else {
1687 /*
1688 * Next subbuffer not being written to, and we
1689 * are either in overwrite mode or the buffer is
1690 * not full. It's safe to write in this new
1691 * subbuffer.
1692 */
1693 }
1694 } else {
1695 /*
1696 * Next subbuffer reserve offset does not match the
0fdec686
MD
1697 * commit offset, and this did not involve update to the
1698 * reserve counter. Drop record in producer-consumer and
1699 * overwrite mode. Caused by either a writer OOPS or
1700 * too many nested writes over a reserve/commit pair.
f3bc08c5
MD
1701 */
1702 v_inc(config, &buf->records_lost_wrap);
97ca2c54 1703 return -EIO;
f3bc08c5
MD
1704 }
1705 offsets->size =
1706 config->cb.record_header_size(config, chan,
1707 offsets->begin,
f3bc08c5 1708 &offsets->pre_header_padding,
64c796d8 1709 ctx);
f3bc08c5
MD
1710 offsets->size +=
1711 lib_ring_buffer_align(offsets->begin + offsets->size,
1712 ctx->largest_align)
1713 + ctx->data_size;
1714 if (unlikely(subbuf_offset(offsets->begin, chan)
1715 + offsets->size > chan->backend.subbuf_size)) {
1716 /*
1717 * Record too big for subbuffers, report error, don't
1718 * complete the sub-buffer switch.
1719 */
1720 v_inc(config, &buf->records_lost_big);
97ca2c54 1721 return -ENOSPC;
f3bc08c5
MD
1722 } else {
1723 /*
1724 * We just made a successful buffer switch and the
1725 * record fits in the new subbuffer. Let's write.
1726 */
1727 }
1728 } else {
1729 /*
1730 * Record fits in the current buffer and we are not on a switch
1731 * boundary. It's safe to write.
1732 */
1733 }
1734 offsets->end = offsets->begin + offsets->size;
f5ea5800
MD
1735
1736 if (unlikely(subbuf_offset(offsets->end, chan) == 0)) {
1737 /*
1738 * The offset_end will fall at the very beginning of the next
1739 * subbuffer.
1740 */
1741 offsets->switch_new_end = 1; /* For offsets->begin */
1742 }
f3bc08c5
MD
1743 return 0;
1744}
1745
1746/**
1747 * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
1748 * @ctx: ring buffer context.
1749 *
97ca2c54
MD
1750 * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
1751 * -EIO for other errors, else returns 0.
f3bc08c5
MD
1752 * It will take care of sub-buffer switching.
1753 */
1754int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
1755{
1756 struct channel *chan = ctx->chan;
5a8fd222 1757 const struct lib_ring_buffer_config *config = &chan->backend.config;
f3bc08c5
MD
1758 struct lib_ring_buffer *buf;
1759 struct switch_offsets offsets;
c099397a 1760 int ret;
f3bc08c5
MD
1761
1762 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
1763 buf = per_cpu_ptr(chan->backend.buf, ctx->cpu);
1764 else
1765 buf = chan->backend.buf;
1766 ctx->buf = buf;
1767
1768 offsets.size = 0;
1769
1770 do {
97ca2c54
MD
1771 ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
1772 ctx);
1773 if (unlikely(ret))
1774 return ret;
f3bc08c5
MD
1775 } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
1776 offsets.end)
1777 != offsets.old));
1778
1779 /*
1780 * Atomically update last_tsc. This update races against concurrent
1781 * atomic updates, but the race will always cause supplementary full TSC
1782 * records, never the opposite (missing a full TSC record when it would
1783 * be needed).
1784 */
1785 save_last_tsc(config, buf, ctx->tsc);
1786
1787 /*
1788 * Push the reader if necessary
1789 */
1790 lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
1791
1792 /*
1793 * Clear noref flag for this subbuffer.
1794 */
1795 lib_ring_buffer_clear_noref(config, &buf->backend,
1796 subbuf_index(offsets.end - 1, chan));
1797
1798 /*
1799 * Switch old subbuffer if needed.
1800 */
1801 if (unlikely(offsets.switch_old_end)) {
1802 lib_ring_buffer_clear_noref(config, &buf->backend,
1803 subbuf_index(offsets.old - 1, chan));
1804 lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
1805 }
1806
1807 /*
1808 * Populate new subbuffer.
1809 */
1810 if (unlikely(offsets.switch_new_start))
1811 lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
1812
f5ea5800
MD
1813 if (unlikely(offsets.switch_new_end))
1814 lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
1815
f3bc08c5
MD
1816 ctx->slot_size = offsets.size;
1817 ctx->pre_offset = offsets.begin;
1818 ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
1819 return 0;
1820}
1821EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow);
6fb8de4b 1822
02a766bb 1823int __init init_lib_ring_buffer_frontend(void)
6fb8de4b
MD
1824{
1825 int cpu;
1826
1827 for_each_possible_cpu(cpu)
1828 spin_lock_init(&per_cpu(ring_buffer_nohz_lock, cpu));
02a766bb 1829 return 0;
6fb8de4b 1830}
02a766bb
MD
1831
1832module_init(init_lib_ring_buffer_frontend);
1a5db82d
MD
1833
1834void __exit exit_lib_ring_buffer_frontend(void)
1835{
1836}
1837
1838module_exit(exit_lib_ring_buffer_frontend);
This page took 0.110041 seconds and 4 git commands to generate.