1 /* SPDX-License-Identifier: (GPL-2.0-only OR LGPL-2.1-only)
3 * ring_buffer_iterator.c
5 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
6 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
7 * complexity for the "get next event" operation.
9 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
12 #include <wrapper/ringbuffer/iterator.h>
13 #include <linux/file.h>
14 #include <linux/uaccess.h>
15 #include <linux/jiffies.h>
16 #include <linux/delay.h>
17 #include <linux/module.h>
20 * Safety factor taking into account internal kernel interrupt latency.
21 * Assuming 250ms worse-case latency.
23 #define MAX_SYSTEM_LATENCY 250
26 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
28 #define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
31 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
35 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
36 * buffer is empty and finalized. The buffer must already be opened for reading.
38 ssize_t
lib_ring_buffer_get_next_record(struct channel
*chan
,
39 struct lib_ring_buffer
*buf
)
41 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
42 struct lib_ring_buffer_iter
*iter
= &buf
->iter
;
46 switch (iter
->state
) {
48 ret
= lib_ring_buffer_get_next_subbuf(buf
);
49 if (ret
&& !READ_ONCE(buf
->finalized
)
50 && config
->alloc
== RING_BUFFER_ALLOC_GLOBAL
) {
52 * Use "pull" scheme for global buffers. The reader
53 * itself flushes the buffer to "pull" data not visible
54 * to readers yet. Flush current subbuffer and re-try.
56 * Per-CPU buffers rather use a "push" scheme because
57 * the IPI needed to flush all CPU's buffers is too
58 * costly. In the "push" scheme, the reader waits for
59 * the writer periodic timer to flush the
60 * buffers (keeping track of a quiescent state
61 * timestamp). Therefore, the writer "pushes" data out
62 * of the buffers rather than letting the reader "pull"
63 * data from the buffer.
65 lib_ring_buffer_switch_slow(buf
, SWITCH_ACTIVE
);
66 ret
= lib_ring_buffer_get_next_subbuf(buf
);
70 iter
->consumed
= buf
->cons_snapshot
;
71 iter
->data_size
= lib_ring_buffer_get_read_data_size(config
, buf
);
72 iter
->read_offset
= iter
->consumed
;
74 iter
->read_offset
+= config
->cb
.subbuffer_header_size();
75 iter
->state
= ITER_TEST_RECORD
;
77 case ITER_TEST_RECORD
:
78 if (iter
->read_offset
- iter
->consumed
>= iter
->data_size
) {
79 iter
->state
= ITER_PUT_SUBBUF
;
81 CHAN_WARN_ON(chan
, !config
->cb
.record_get
);
82 config
->cb
.record_get(config
, chan
, buf
,
87 iter
->read_offset
+= iter
->header_len
;
88 subbuffer_consume_record(config
, &buf
->backend
);
89 iter
->state
= ITER_NEXT_RECORD
;
90 return iter
->payload_len
;
93 case ITER_NEXT_RECORD
:
94 iter
->read_offset
+= iter
->payload_len
;
95 iter
->state
= ITER_TEST_RECORD
;
98 lib_ring_buffer_put_next_subbuf(buf
);
99 iter
->state
= ITER_GET_SUBBUF
;
102 CHAN_WARN_ON(chan
, 1); /* Should not happen */
106 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record
);
108 static int buf_is_higher(void *a
, void *b
)
110 struct lib_ring_buffer
*bufa
= a
;
111 struct lib_ring_buffer
*bufb
= b
;
113 /* Consider lowest timestamps to be at the top of the heap */
114 return (bufa
->iter
.timestamp
< bufb
->iter
.timestamp
);
118 void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config
*config
,
119 struct channel
*chan
)
121 struct lttng_ptr_heap
*heap
= &chan
->iter
.heap
;
122 struct lib_ring_buffer
*buf
, *tmp
;
125 list_for_each_entry_safe(buf
, tmp
, &chan
->iter
.empty_head
,
127 len
= lib_ring_buffer_get_next_record(chan
, buf
);
130 * Deal with -EAGAIN and -ENODATA.
131 * len >= 0 means record contains data.
132 * -EBUSY should never happen, because we support only one
137 /* Keep node in empty list */
141 * Buffer is finalized. Don't add to list of empty
142 * buffer, because it has no more data to provide, ever.
144 list_del(&buf
->iter
.empty_node
);
147 CHAN_WARN_ON(chan
, 1);
151 * Insert buffer into the heap, remove from empty buffer
154 CHAN_WARN_ON(chan
, len
< 0);
155 list_del(&buf
->iter
.empty_node
);
156 CHAN_WARN_ON(chan
, lttng_heap_insert(heap
, buf
));
162 void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config
*config
,
163 struct channel
*chan
)
166 unsigned long wait_msecs
;
169 * No need to wait if no empty buffers are present.
171 if (list_empty(&chan
->iter
.empty_head
))
174 timestamp_qs
= config
->cb
.ring_buffer_clock_read(chan
);
176 * We need to consider previously empty buffers.
177 * Do a get next buf record on each of them. Add them to
178 * the heap if they have data. If at least one of them
179 * don't have data, we need to wait for
180 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
181 * buffers have been switched either by the timer or idle entry) and
182 * check them again, adding them if they have data.
184 lib_ring_buffer_get_empty_buf_records(config
, chan
);
187 * No need to wait if no empty buffers are present.
189 if (list_empty(&chan
->iter
.empty_head
))
193 * We need to wait for the buffer switch timer to run. If the
194 * CPU is idle, idle entry performed the switch.
195 * TODO: we could optimize further by skipping the sleep if all
196 * empty buffers belong to idle or offline cpus.
198 wait_msecs
= jiffies_to_msecs(chan
->switch_timer_interval
);
199 wait_msecs
+= MAX_SYSTEM_LATENCY
;
201 lib_ring_buffer_get_empty_buf_records(config
, chan
);
203 * Any buffer still in the empty list here cannot possibly
204 * contain an event with a timestamp prior to "timestamp_qs".
205 * The new quiescent state timestamp is the one we grabbed
206 * before waiting for buffer data. It is therefore safe to
207 * ignore empty buffers up to last_qs timestamp for fusion
210 chan
->iter
.last_qs
= timestamp_qs
;
214 * channel_get_next_record - Get the next record in a channel.
216 * @ret_buf: the buffer in which the event is located (output)
218 * Returns the size of new current event, -EAGAIN if all buffers are empty,
219 * -ENODATA if all buffers are empty and finalized. The channel must already be
220 * opened for reading.
223 ssize_t
channel_get_next_record(struct channel
*chan
,
224 struct lib_ring_buffer
**ret_buf
)
226 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
227 struct lib_ring_buffer
*buf
;
228 struct lttng_ptr_heap
*heap
;
231 if (config
->alloc
== RING_BUFFER_ALLOC_GLOBAL
) {
232 *ret_buf
= channel_get_ring_buffer(config
, chan
, 0);
233 return lib_ring_buffer_get_next_record(chan
, *ret_buf
);
236 heap
= &chan
->iter
.heap
;
239 * get next record for topmost buffer.
241 buf
= lttng_heap_maximum(heap
);
243 len
= lib_ring_buffer_get_next_record(chan
, buf
);
245 * Deal with -EAGAIN and -ENODATA.
246 * len >= 0 means record contains data.
250 buf
->iter
.timestamp
= 0;
251 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
252 /* Remove topmost buffer from the heap */
253 CHAN_WARN_ON(chan
, lttng_heap_remove(heap
) != buf
);
257 * Buffer is finalized. Remove buffer from heap and
258 * don't add to list of empty buffer, because it has no
259 * more data to provide, ever.
261 CHAN_WARN_ON(chan
, lttng_heap_remove(heap
) != buf
);
264 CHAN_WARN_ON(chan
, 1);
268 * Reinsert buffer into the heap. Note that heap can be
269 * partially empty, so we need to use
270 * lttng_heap_replace_max().
272 CHAN_WARN_ON(chan
, len
< 0);
273 CHAN_WARN_ON(chan
, lttng_heap_replace_max(heap
, buf
) != buf
);
278 buf
= lttng_heap_maximum(heap
);
279 if (!buf
|| buf
->iter
.timestamp
> chan
->iter
.last_qs
) {
281 * Deal with buffers previously showing no data.
282 * Add buffers containing data to the heap, update
285 lib_ring_buffer_wait_for_qs(config
, chan
);
288 *ret_buf
= buf
= lttng_heap_maximum(heap
);
291 * If this warning triggers, you probably need to check your
292 * system interrupt latency. Typical causes: too many printk()
293 * output going to a serial console with interrupts off.
294 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
295 * Observed on SMP KVM setups with trace_clock().
297 if (chan
->iter
.last_timestamp
298 > (buf
->iter
.timestamp
+ MAX_CLOCK_DELTA
)) {
299 printk(KERN_WARNING
"ring_buffer: timestamps going "
300 "backward. Last time %llu ns, cpu %d, "
301 "current time %llu ns, cpu %d, "
303 chan
->iter
.last_timestamp
, chan
->iter
.last_cpu
,
304 buf
->iter
.timestamp
, buf
->backend
.cpu
,
305 chan
->iter
.last_timestamp
- buf
->iter
.timestamp
);
306 CHAN_WARN_ON(chan
, 1);
308 chan
->iter
.last_timestamp
= buf
->iter
.timestamp
;
309 chan
->iter
.last_cpu
= buf
->backend
.cpu
;
310 return buf
->iter
.payload_len
;
313 if (list_empty(&chan
->iter
.empty_head
))
314 return -ENODATA
; /* All buffers finalized */
316 return -EAGAIN
; /* Temporarily empty */
319 EXPORT_SYMBOL_GPL(channel_get_next_record
);
322 void lib_ring_buffer_iterator_init(struct channel
*chan
, struct lib_ring_buffer
*buf
)
324 if (buf
->iter
.allocated
)
327 buf
->iter
.allocated
= 1;
328 if (chan
->iter
.read_open
&& !buf
->iter
.read_open
) {
329 CHAN_WARN_ON(chan
, lib_ring_buffer_open_read(buf
) != 0);
330 buf
->iter
.read_open
= 1;
333 /* Add to list of buffers without any current record */
334 if (chan
->backend
.config
.alloc
== RING_BUFFER_ALLOC_PER_CPU
)
335 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
338 int lttng_cpuhp_rb_iter_online(unsigned int cpu
,
339 struct lttng_cpuhp_node
*node
)
341 struct channel
*chan
= container_of(node
, struct channel
,
343 struct lib_ring_buffer
*buf
= per_cpu_ptr(chan
->backend
.buf
, cpu
);
344 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
346 CHAN_WARN_ON(chan
, config
->alloc
== RING_BUFFER_ALLOC_GLOBAL
);
348 lib_ring_buffer_iterator_init(chan
, buf
);
351 EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_iter_online
);
353 int channel_iterator_init(struct channel
*chan
)
355 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
356 struct lib_ring_buffer
*buf
;
358 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
361 INIT_LIST_HEAD(&chan
->iter
.empty_head
);
362 ret
= lttng_heap_init(&chan
->iter
.heap
,
364 GFP_KERNEL
, buf_is_higher
);
368 chan
->cpuhp_iter_online
.component
= LTTNG_RING_BUFFER_ITER
;
369 ret
= cpuhp_state_add_instance(lttng_rb_hp_online
,
370 &chan
->cpuhp_iter_online
.node
);
374 buf
= channel_get_ring_buffer(config
, chan
, 0);
375 lib_ring_buffer_iterator_init(chan
, buf
);
380 void channel_iterator_unregister_notifiers(struct channel
*chan
)
382 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
384 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
388 ret
= cpuhp_state_remove_instance(lttng_rb_hp_online
,
389 &chan
->cpuhp_iter_online
.node
);
395 void channel_iterator_free(struct channel
*chan
)
397 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
399 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
)
400 lttng_heap_free(&chan
->iter
.heap
);
403 int lib_ring_buffer_iterator_open(struct lib_ring_buffer
*buf
)
405 struct channel
*chan
= buf
->backend
.chan
;
406 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
407 CHAN_WARN_ON(chan
, config
->output
!= RING_BUFFER_ITERATOR
);
408 return lib_ring_buffer_open_read(buf
);
410 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open
);
413 * Note: Iterators must not be mixed with other types of outputs, because an
414 * iterator can leave the buffer in "GET" state, which is not consistent with
415 * other types of output (mmap, splice, raw data read).
417 void lib_ring_buffer_iterator_release(struct lib_ring_buffer
*buf
)
419 lib_ring_buffer_release_read(buf
);
421 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release
);
423 int channel_iterator_open(struct channel
*chan
)
425 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
426 struct lib_ring_buffer
*buf
;
429 CHAN_WARN_ON(chan
, config
->output
!= RING_BUFFER_ITERATOR
);
431 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
433 /* Allow CPU hotplug to keep track of opened reader */
434 chan
->iter
.read_open
= 1;
435 for_each_channel_cpu(cpu
, chan
) {
436 buf
= channel_get_ring_buffer(config
, chan
, cpu
);
437 ret
= lib_ring_buffer_iterator_open(buf
);
440 buf
->iter
.read_open
= 1;
444 buf
= channel_get_ring_buffer(config
, chan
, 0);
445 ret
= lib_ring_buffer_iterator_open(buf
);
449 /* Error should always happen on CPU 0, hence no close is required. */
450 CHAN_WARN_ON(chan
, cpu
!= 0);
454 EXPORT_SYMBOL_GPL(channel_iterator_open
);
456 void channel_iterator_release(struct channel
*chan
)
458 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
459 struct lib_ring_buffer
*buf
;
462 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
464 for_each_channel_cpu(cpu
, chan
) {
465 buf
= channel_get_ring_buffer(config
, chan
, cpu
);
466 if (buf
->iter
.read_open
) {
467 lib_ring_buffer_iterator_release(buf
);
468 buf
->iter
.read_open
= 0;
471 chan
->iter
.read_open
= 0;
474 buf
= channel_get_ring_buffer(config
, chan
, 0);
475 lib_ring_buffer_iterator_release(buf
);
478 EXPORT_SYMBOL_GPL(channel_iterator_release
);
480 void lib_ring_buffer_iterator_reset(struct lib_ring_buffer
*buf
)
482 struct channel
*chan
= buf
->backend
.chan
;
484 if (buf
->iter
.state
!= ITER_GET_SUBBUF
)
485 lib_ring_buffer_put_next_subbuf(buf
);
486 buf
->iter
.state
= ITER_GET_SUBBUF
;
487 /* Remove from heap (if present). */
488 if (lttng_heap_cherrypick(&chan
->iter
.heap
, buf
))
489 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
490 buf
->iter
.timestamp
= 0;
491 buf
->iter
.header_len
= 0;
492 buf
->iter
.payload_len
= 0;
493 buf
->iter
.consumed
= 0;
494 buf
->iter
.read_offset
= 0;
495 buf
->iter
.data_size
= 0;
496 /* Don't reset allocated and read_open */
499 void channel_iterator_reset(struct channel
*chan
)
501 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
502 struct lib_ring_buffer
*buf
;
505 /* Empty heap, put into empty_head */
506 while ((buf
= lttng_heap_remove(&chan
->iter
.heap
)) != NULL
)
507 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
509 for_each_channel_cpu(cpu
, chan
) {
510 buf
= channel_get_ring_buffer(config
, chan
, cpu
);
511 lib_ring_buffer_iterator_reset(buf
);
513 /* Don't reset read_open */
514 chan
->iter
.last_qs
= 0;
515 chan
->iter
.last_timestamp
= 0;
516 chan
->iter
.last_cpu
= 0;
517 chan
->iter
.len_left
= 0;
521 * Ring buffer payload extraction read() implementation.
524 ssize_t
channel_ring_buffer_file_read(struct file
*filp
,
525 char __user
*user_buf
,
528 struct channel
*chan
,
529 struct lib_ring_buffer
*buf
,
532 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
533 size_t read_count
= 0, read_offset
;
537 if (!access_ok(user_buf
, count
))
540 /* Finish copy of previous record */
542 if (read_count
< count
) {
543 len
= chan
->iter
.len_left
;
545 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
547 buf
= lttng_heap_maximum(&chan
->iter
.heap
);
548 CHAN_WARN_ON(chan
, !buf
);
553 while (read_count
< count
) {
554 size_t copy_len
, space_left
;
557 len
= channel_get_next_record(chan
, &buf
);
559 len
= lib_ring_buffer_get_next_record(chan
, buf
);
563 * Check if buffer is finalized (end of file).
565 if (len
== -ENODATA
) {
566 /* A 0 read_count will tell about end of file */
569 if (filp
->f_flags
& O_NONBLOCK
) {
571 read_count
= -EAGAIN
;
577 * No data available at the moment, return what
584 * Wait for returned len to be >= 0 or -ENODATA.
587 error
= wait_event_interruptible(
589 ((len
= channel_get_next_record(chan
,
590 &buf
)), len
!= -EAGAIN
));
592 error
= wait_event_interruptible(
594 ((len
= lib_ring_buffer_get_next_record(
595 chan
, buf
)), len
!= -EAGAIN
));
596 CHAN_WARN_ON(chan
, len
== -EBUSY
);
601 CHAN_WARN_ON(chan
, len
< 0 && len
!= -ENODATA
);
605 read_offset
= buf
->iter
.read_offset
;
607 space_left
= count
- read_count
;
608 if (len
<= space_left
) {
610 chan
->iter
.len_left
= 0;
613 copy_len
= space_left
;
614 chan
->iter
.len_left
= len
- copy_len
;
615 *ppos
= read_offset
+ copy_len
;
617 if (__lib_ring_buffer_copy_to_user(&buf
->backend
, read_offset
,
618 &user_buf
[read_count
],
621 * Leave the len_left and ppos values at their current
622 * state, as we currently have a valid event to read.
626 read_count
+= copy_len
;
632 chan
->iter
.len_left
= 0;
637 * lib_ring_buffer_file_read - Read buffer record payload.
638 * @filp: file structure pointer.
639 * @buffer: user buffer to read data into.
640 * @count: number of bytes to read.
641 * @ppos: file read position.
643 * Returns a negative value on error, or the number of bytes read on success.
644 * ppos is used to save the position _within the current record_ between calls
648 ssize_t
lib_ring_buffer_file_read(struct file
*filp
,
649 char __user
*user_buf
,
653 struct inode
*inode
= filp
->f_path
.dentry
->d_inode
;
654 struct lib_ring_buffer
*buf
= inode
->i_private
;
655 struct channel
*chan
= buf
->backend
.chan
;
657 return channel_ring_buffer_file_read(filp
, user_buf
, count
, ppos
,
662 * channel_file_read - Read channel record payload.
663 * @filp: file structure pointer.
664 * @buffer: user buffer to read data into.
665 * @count: number of bytes to read.
666 * @ppos: file read position.
668 * Returns a negative value on error, or the number of bytes read on success.
669 * ppos is used to save the position _within the current record_ between calls
673 ssize_t
channel_file_read(struct file
*filp
,
674 char __user
*user_buf
,
678 struct inode
*inode
= filp
->f_path
.dentry
->d_inode
;
679 struct channel
*chan
= inode
->i_private
;
680 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
682 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
)
683 return channel_ring_buffer_file_read(filp
, user_buf
, count
,
684 ppos
, chan
, NULL
, 1);
686 struct lib_ring_buffer
*buf
=
687 channel_get_ring_buffer(config
, chan
, 0);
688 return channel_ring_buffer_file_read(filp
, user_buf
, count
,
694 int lib_ring_buffer_file_open(struct inode
*inode
, struct file
*file
)
696 struct lib_ring_buffer
*buf
= inode
->i_private
;
699 ret
= lib_ring_buffer_iterator_open(buf
);
703 file
->private_data
= buf
;
704 ret
= nonseekable_open(inode
, file
);
710 lib_ring_buffer_iterator_release(buf
);
715 int lib_ring_buffer_file_release(struct inode
*inode
, struct file
*file
)
717 struct lib_ring_buffer
*buf
= inode
->i_private
;
719 lib_ring_buffer_iterator_release(buf
);
724 int channel_file_open(struct inode
*inode
, struct file
*file
)
726 struct channel
*chan
= inode
->i_private
;
729 ret
= channel_iterator_open(chan
);
733 file
->private_data
= chan
;
734 ret
= nonseekable_open(inode
, file
);
740 channel_iterator_release(chan
);
745 int channel_file_release(struct inode
*inode
, struct file
*file
)
747 struct channel
*chan
= inode
->i_private
;
749 channel_iterator_release(chan
);
753 const struct file_operations channel_payload_file_operations
= {
754 .owner
= THIS_MODULE
,
755 .open
= channel_file_open
,
756 .release
= channel_file_release
,
757 .read
= channel_file_read
,
758 .llseek
= vfs_lib_ring_buffer_no_llseek
,
760 EXPORT_SYMBOL_GPL(channel_payload_file_operations
);
762 const struct file_operations lib_ring_buffer_payload_file_operations
= {
763 .owner
= THIS_MODULE
,
764 .open
= lib_ring_buffer_file_open
,
765 .release
= lib_ring_buffer_file_release
,
766 .read
= lib_ring_buffer_file_read
,
767 .llseek
= vfs_lib_ring_buffer_no_llseek
,
769 EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations
);