9efe49194f7d4b1efd11883f407fb1f979a8f2db
[lttng-modules.git] / lib / ringbuffer / ring_buffer_iterator.c
1 /* SPDX-License-Identifier: (GPL-2.0 OR LGPL-2.1)
2 *
3 * ring_buffer_iterator.c
4 *
5 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
6 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
7 * complexity for the "get next event" operation.
8 *
9 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 */
11
12 #include <wrapper/ringbuffer/iterator.h>
13 #include <wrapper/file.h>
14 #include <linux/jiffies.h>
15 #include <linux/delay.h>
16 #include <linux/module.h>
17
18 /*
19 * Safety factor taking into account internal kernel interrupt latency.
20 * Assuming 250ms worse-case latency.
21 */
22 #define MAX_SYSTEM_LATENCY 250
23
24 /*
25 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
26 */
27 #define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
28
29 /**
30 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
31 * @chan: channel
32 * @buf: buffer
33 *
34 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
35 * buffer is empty and finalized. The buffer must already be opened for reading.
36 */
37 ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
38 struct lib_ring_buffer *buf)
39 {
40 const struct lib_ring_buffer_config *config = &chan->backend.config;
41 struct lib_ring_buffer_iter *iter = &buf->iter;
42 int ret;
43
44 restart:
45 switch (iter->state) {
46 case ITER_GET_SUBBUF:
47 ret = lib_ring_buffer_get_next_subbuf(buf);
48 if (ret && !READ_ONCE(buf->finalized)
49 && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
50 /*
51 * Use "pull" scheme for global buffers. The reader
52 * itself flushes the buffer to "pull" data not visible
53 * to readers yet. Flush current subbuffer and re-try.
54 *
55 * Per-CPU buffers rather use a "push" scheme because
56 * the IPI needed to flush all CPU's buffers is too
57 * costly. In the "push" scheme, the reader waits for
58 * the writer periodic timer to flush the
59 * buffers (keeping track of a quiescent state
60 * timestamp). Therefore, the writer "pushes" data out
61 * of the buffers rather than letting the reader "pull"
62 * data from the buffer.
63 */
64 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
65 ret = lib_ring_buffer_get_next_subbuf(buf);
66 }
67 if (ret)
68 return ret;
69 iter->consumed = buf->cons_snapshot;
70 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
71 iter->read_offset = iter->consumed;
72 /* skip header */
73 iter->read_offset += config->cb.subbuffer_header_size();
74 iter->state = ITER_TEST_RECORD;
75 goto restart;
76 case ITER_TEST_RECORD:
77 if (iter->read_offset - iter->consumed >= iter->data_size) {
78 iter->state = ITER_PUT_SUBBUF;
79 } else {
80 CHAN_WARN_ON(chan, !config->cb.record_get);
81 config->cb.record_get(config, chan, buf,
82 iter->read_offset,
83 &iter->header_len,
84 &iter->payload_len,
85 &iter->timestamp);
86 iter->read_offset += iter->header_len;
87 subbuffer_consume_record(config, &buf->backend);
88 iter->state = ITER_NEXT_RECORD;
89 return iter->payload_len;
90 }
91 goto restart;
92 case ITER_NEXT_RECORD:
93 iter->read_offset += iter->payload_len;
94 iter->state = ITER_TEST_RECORD;
95 goto restart;
96 case ITER_PUT_SUBBUF:
97 lib_ring_buffer_put_next_subbuf(buf);
98 iter->state = ITER_GET_SUBBUF;
99 goto restart;
100 default:
101 CHAN_WARN_ON(chan, 1); /* Should not happen */
102 return -EPERM;
103 }
104 }
105 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
106
107 static int buf_is_higher(void *a, void *b)
108 {
109 struct lib_ring_buffer *bufa = a;
110 struct lib_ring_buffer *bufb = b;
111
112 /* Consider lowest timestamps to be at the top of the heap */
113 return (bufa->iter.timestamp < bufb->iter.timestamp);
114 }
115
116 static
117 void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
118 struct channel *chan)
119 {
120 struct lttng_ptr_heap *heap = &chan->iter.heap;
121 struct lib_ring_buffer *buf, *tmp;
122 ssize_t len;
123
124 list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
125 iter.empty_node) {
126 len = lib_ring_buffer_get_next_record(chan, buf);
127
128 /*
129 * Deal with -EAGAIN and -ENODATA.
130 * len >= 0 means record contains data.
131 * -EBUSY should never happen, because we support only one
132 * reader.
133 */
134 switch (len) {
135 case -EAGAIN:
136 /* Keep node in empty list */
137 break;
138 case -ENODATA:
139 /*
140 * Buffer is finalized. Don't add to list of empty
141 * buffer, because it has no more data to provide, ever.
142 */
143 list_del(&buf->iter.empty_node);
144 break;
145 case -EBUSY:
146 CHAN_WARN_ON(chan, 1);
147 break;
148 default:
149 /*
150 * Insert buffer into the heap, remove from empty buffer
151 * list.
152 */
153 CHAN_WARN_ON(chan, len < 0);
154 list_del(&buf->iter.empty_node);
155 CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
156 }
157 }
158 }
159
160 static
161 void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
162 struct channel *chan)
163 {
164 u64 timestamp_qs;
165 unsigned long wait_msecs;
166
167 /*
168 * No need to wait if no empty buffers are present.
169 */
170 if (list_empty(&chan->iter.empty_head))
171 return;
172
173 timestamp_qs = config->cb.ring_buffer_clock_read(chan);
174 /*
175 * We need to consider previously empty buffers.
176 * Do a get next buf record on each of them. Add them to
177 * the heap if they have data. If at least one of them
178 * don't have data, we need to wait for
179 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
180 * buffers have been switched either by the timer or idle entry) and
181 * check them again, adding them if they have data.
182 */
183 lib_ring_buffer_get_empty_buf_records(config, chan);
184
185 /*
186 * No need to wait if no empty buffers are present.
187 */
188 if (list_empty(&chan->iter.empty_head))
189 return;
190
191 /*
192 * We need to wait for the buffer switch timer to run. If the
193 * CPU is idle, idle entry performed the switch.
194 * TODO: we could optimize further by skipping the sleep if all
195 * empty buffers belong to idle or offline cpus.
196 */
197 wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
198 wait_msecs += MAX_SYSTEM_LATENCY;
199 msleep(wait_msecs);
200 lib_ring_buffer_get_empty_buf_records(config, chan);
201 /*
202 * Any buffer still in the empty list here cannot possibly
203 * contain an event with a timestamp prior to "timestamp_qs".
204 * The new quiescent state timestamp is the one we grabbed
205 * before waiting for buffer data. It is therefore safe to
206 * ignore empty buffers up to last_qs timestamp for fusion
207 * merge.
208 */
209 chan->iter.last_qs = timestamp_qs;
210 }
211
212 /**
213 * channel_get_next_record - Get the next record in a channel.
214 * @chan: channel
215 * @ret_buf: the buffer in which the event is located (output)
216 *
217 * Returns the size of new current event, -EAGAIN if all buffers are empty,
218 * -ENODATA if all buffers are empty and finalized. The channel must already be
219 * opened for reading.
220 */
221
222 ssize_t channel_get_next_record(struct channel *chan,
223 struct lib_ring_buffer **ret_buf)
224 {
225 const struct lib_ring_buffer_config *config = &chan->backend.config;
226 struct lib_ring_buffer *buf;
227 struct lttng_ptr_heap *heap;
228 ssize_t len;
229
230 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
231 *ret_buf = channel_get_ring_buffer(config, chan, 0);
232 return lib_ring_buffer_get_next_record(chan, *ret_buf);
233 }
234
235 heap = &chan->iter.heap;
236
237 /*
238 * get next record for topmost buffer.
239 */
240 buf = lttng_heap_maximum(heap);
241 if (buf) {
242 len = lib_ring_buffer_get_next_record(chan, buf);
243 /*
244 * Deal with -EAGAIN and -ENODATA.
245 * len >= 0 means record contains data.
246 */
247 switch (len) {
248 case -EAGAIN:
249 buf->iter.timestamp = 0;
250 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
251 /* Remove topmost buffer from the heap */
252 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
253 break;
254 case -ENODATA:
255 /*
256 * Buffer is finalized. Remove buffer from heap and
257 * don't add to list of empty buffer, because it has no
258 * more data to provide, ever.
259 */
260 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
261 break;
262 case -EBUSY:
263 CHAN_WARN_ON(chan, 1);
264 break;
265 default:
266 /*
267 * Reinsert buffer into the heap. Note that heap can be
268 * partially empty, so we need to use
269 * lttng_heap_replace_max().
270 */
271 CHAN_WARN_ON(chan, len < 0);
272 CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
273 break;
274 }
275 }
276
277 buf = lttng_heap_maximum(heap);
278 if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
279 /*
280 * Deal with buffers previously showing no data.
281 * Add buffers containing data to the heap, update
282 * last_qs.
283 */
284 lib_ring_buffer_wait_for_qs(config, chan);
285 }
286
287 *ret_buf = buf = lttng_heap_maximum(heap);
288 if (buf) {
289 /*
290 * If this warning triggers, you probably need to check your
291 * system interrupt latency. Typical causes: too many printk()
292 * output going to a serial console with interrupts off.
293 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
294 * Observed on SMP KVM setups with trace_clock().
295 */
296 if (chan->iter.last_timestamp
297 > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
298 printk(KERN_WARNING "ring_buffer: timestamps going "
299 "backward. Last time %llu ns, cpu %d, "
300 "current time %llu ns, cpu %d, "
301 "delta %llu ns.\n",
302 chan->iter.last_timestamp, chan->iter.last_cpu,
303 buf->iter.timestamp, buf->backend.cpu,
304 chan->iter.last_timestamp - buf->iter.timestamp);
305 CHAN_WARN_ON(chan, 1);
306 }
307 chan->iter.last_timestamp = buf->iter.timestamp;
308 chan->iter.last_cpu = buf->backend.cpu;
309 return buf->iter.payload_len;
310 } else {
311 /* Heap is empty */
312 if (list_empty(&chan->iter.empty_head))
313 return -ENODATA; /* All buffers finalized */
314 else
315 return -EAGAIN; /* Temporarily empty */
316 }
317 }
318 EXPORT_SYMBOL_GPL(channel_get_next_record);
319
320 static
321 void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
322 {
323 if (buf->iter.allocated)
324 return;
325
326 buf->iter.allocated = 1;
327 if (chan->iter.read_open && !buf->iter.read_open) {
328 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
329 buf->iter.read_open = 1;
330 }
331
332 /* Add to list of buffers without any current record */
333 if (chan->backend.config.alloc == RING_BUFFER_ALLOC_PER_CPU)
334 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
335 }
336
337 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
338
339 int lttng_cpuhp_rb_iter_online(unsigned int cpu,
340 struct lttng_cpuhp_node *node)
341 {
342 struct channel *chan = container_of(node, struct channel,
343 cpuhp_iter_online);
344 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
345 const struct lib_ring_buffer_config *config = &chan->backend.config;
346
347 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
348
349 lib_ring_buffer_iterator_init(chan, buf);
350 return 0;
351 }
352 EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_iter_online);
353
354 #else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
355
356 #ifdef CONFIG_HOTPLUG_CPU
357 static
358 int channel_iterator_cpu_hotplug(struct notifier_block *nb,
359 unsigned long action,
360 void *hcpu)
361 {
362 unsigned int cpu = (unsigned long)hcpu;
363 struct channel *chan = container_of(nb, struct channel,
364 hp_iter_notifier);
365 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
366 const struct lib_ring_buffer_config *config = &chan->backend.config;
367
368 if (!chan->hp_iter_enable)
369 return NOTIFY_DONE;
370
371 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
372
373 switch (action) {
374 case CPU_DOWN_FAILED:
375 case CPU_DOWN_FAILED_FROZEN:
376 case CPU_ONLINE:
377 case CPU_ONLINE_FROZEN:
378 lib_ring_buffer_iterator_init(chan, buf);
379 return NOTIFY_OK;
380 default:
381 return NOTIFY_DONE;
382 }
383 }
384 #endif
385
386 #endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
387
388 int channel_iterator_init(struct channel *chan)
389 {
390 const struct lib_ring_buffer_config *config = &chan->backend.config;
391 struct lib_ring_buffer *buf;
392
393 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
394 int ret;
395
396 INIT_LIST_HEAD(&chan->iter.empty_head);
397 ret = lttng_heap_init(&chan->iter.heap,
398 num_possible_cpus(),
399 GFP_KERNEL, buf_is_higher);
400 if (ret)
401 return ret;
402
403 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
404 chan->cpuhp_iter_online.component = LTTNG_RING_BUFFER_ITER;
405 ret = cpuhp_state_add_instance(lttng_rb_hp_online,
406 &chan->cpuhp_iter_online.node);
407 if (ret)
408 return ret;
409 #else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
410 {
411 int cpu;
412
413 /*
414 * In case of non-hotplug cpu, if the ring-buffer is allocated
415 * in early initcall, it will not be notified of secondary cpus.
416 * In that off case, we need to allocate for all possible cpus.
417 */
418 #ifdef CONFIG_HOTPLUG_CPU
419 chan->hp_iter_notifier.notifier_call =
420 channel_iterator_cpu_hotplug;
421 chan->hp_iter_notifier.priority = 10;
422 register_cpu_notifier(&chan->hp_iter_notifier);
423
424 get_online_cpus();
425 for_each_online_cpu(cpu) {
426 buf = per_cpu_ptr(chan->backend.buf, cpu);
427 lib_ring_buffer_iterator_init(chan, buf);
428 }
429 chan->hp_iter_enable = 1;
430 put_online_cpus();
431 #else
432 for_each_possible_cpu(cpu) {
433 buf = per_cpu_ptr(chan->backend.buf, cpu);
434 lib_ring_buffer_iterator_init(chan, buf);
435 }
436 #endif
437 }
438 #endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
439 } else {
440 buf = channel_get_ring_buffer(config, chan, 0);
441 lib_ring_buffer_iterator_init(chan, buf);
442 }
443 return 0;
444 }
445
446 void channel_iterator_unregister_notifiers(struct channel *chan)
447 {
448 const struct lib_ring_buffer_config *config = &chan->backend.config;
449
450 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
451 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
452 {
453 int ret;
454
455 ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
456 &chan->cpuhp_iter_online.node);
457 WARN_ON(ret);
458 }
459 #else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
460 chan->hp_iter_enable = 0;
461 unregister_cpu_notifier(&chan->hp_iter_notifier);
462 #endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
463 }
464 }
465
466 void channel_iterator_free(struct channel *chan)
467 {
468 const struct lib_ring_buffer_config *config = &chan->backend.config;
469
470 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
471 lttng_heap_free(&chan->iter.heap);
472 }
473
474 int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
475 {
476 struct channel *chan = buf->backend.chan;
477 const struct lib_ring_buffer_config *config = &chan->backend.config;
478 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
479 return lib_ring_buffer_open_read(buf);
480 }
481 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
482
483 /*
484 * Note: Iterators must not be mixed with other types of outputs, because an
485 * iterator can leave the buffer in "GET" state, which is not consistent with
486 * other types of output (mmap, splice, raw data read).
487 */
488 void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
489 {
490 lib_ring_buffer_release_read(buf);
491 }
492 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
493
494 int channel_iterator_open(struct channel *chan)
495 {
496 const struct lib_ring_buffer_config *config = &chan->backend.config;
497 struct lib_ring_buffer *buf;
498 int ret = 0, cpu;
499
500 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
501
502 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
503 get_online_cpus();
504 /* Allow CPU hotplug to keep track of opened reader */
505 chan->iter.read_open = 1;
506 for_each_channel_cpu(cpu, chan) {
507 buf = channel_get_ring_buffer(config, chan, cpu);
508 ret = lib_ring_buffer_iterator_open(buf);
509 if (ret)
510 goto error;
511 buf->iter.read_open = 1;
512 }
513 put_online_cpus();
514 } else {
515 buf = channel_get_ring_buffer(config, chan, 0);
516 ret = lib_ring_buffer_iterator_open(buf);
517 }
518 return ret;
519 error:
520 /* Error should always happen on CPU 0, hence no close is required. */
521 CHAN_WARN_ON(chan, cpu != 0);
522 put_online_cpus();
523 return ret;
524 }
525 EXPORT_SYMBOL_GPL(channel_iterator_open);
526
527 void channel_iterator_release(struct channel *chan)
528 {
529 const struct lib_ring_buffer_config *config = &chan->backend.config;
530 struct lib_ring_buffer *buf;
531 int cpu;
532
533 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
534 get_online_cpus();
535 for_each_channel_cpu(cpu, chan) {
536 buf = channel_get_ring_buffer(config, chan, cpu);
537 if (buf->iter.read_open) {
538 lib_ring_buffer_iterator_release(buf);
539 buf->iter.read_open = 0;
540 }
541 }
542 chan->iter.read_open = 0;
543 put_online_cpus();
544 } else {
545 buf = channel_get_ring_buffer(config, chan, 0);
546 lib_ring_buffer_iterator_release(buf);
547 }
548 }
549 EXPORT_SYMBOL_GPL(channel_iterator_release);
550
551 void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
552 {
553 struct channel *chan = buf->backend.chan;
554
555 if (buf->iter.state != ITER_GET_SUBBUF)
556 lib_ring_buffer_put_next_subbuf(buf);
557 buf->iter.state = ITER_GET_SUBBUF;
558 /* Remove from heap (if present). */
559 if (lttng_heap_cherrypick(&chan->iter.heap, buf))
560 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
561 buf->iter.timestamp = 0;
562 buf->iter.header_len = 0;
563 buf->iter.payload_len = 0;
564 buf->iter.consumed = 0;
565 buf->iter.read_offset = 0;
566 buf->iter.data_size = 0;
567 /* Don't reset allocated and read_open */
568 }
569
570 void channel_iterator_reset(struct channel *chan)
571 {
572 const struct lib_ring_buffer_config *config = &chan->backend.config;
573 struct lib_ring_buffer *buf;
574 int cpu;
575
576 /* Empty heap, put into empty_head */
577 while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
578 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
579
580 for_each_channel_cpu(cpu, chan) {
581 buf = channel_get_ring_buffer(config, chan, cpu);
582 lib_ring_buffer_iterator_reset(buf);
583 }
584 /* Don't reset read_open */
585 chan->iter.last_qs = 0;
586 chan->iter.last_timestamp = 0;
587 chan->iter.last_cpu = 0;
588 chan->iter.len_left = 0;
589 }
590
591 /*
592 * Ring buffer payload extraction read() implementation.
593 */
594 static
595 ssize_t channel_ring_buffer_file_read(struct file *filp,
596 char __user *user_buf,
597 size_t count,
598 loff_t *ppos,
599 struct channel *chan,
600 struct lib_ring_buffer *buf,
601 int fusionmerge)
602 {
603 const struct lib_ring_buffer_config *config = &chan->backend.config;
604 size_t read_count = 0, read_offset;
605 ssize_t len;
606
607 might_sleep();
608 if (!access_ok(VERIFY_WRITE, user_buf, count))
609 return -EFAULT;
610
611 /* Finish copy of previous record */
612 if (*ppos != 0) {
613 if (read_count < count) {
614 len = chan->iter.len_left;
615 read_offset = *ppos;
616 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
617 && fusionmerge)
618 buf = lttng_heap_maximum(&chan->iter.heap);
619 CHAN_WARN_ON(chan, !buf);
620 goto skip_get_next;
621 }
622 }
623
624 while (read_count < count) {
625 size_t copy_len, space_left;
626
627 if (fusionmerge)
628 len = channel_get_next_record(chan, &buf);
629 else
630 len = lib_ring_buffer_get_next_record(chan, buf);
631 len_test:
632 if (len < 0) {
633 /*
634 * Check if buffer is finalized (end of file).
635 */
636 if (len == -ENODATA) {
637 /* A 0 read_count will tell about end of file */
638 goto nodata;
639 }
640 if (filp->f_flags & O_NONBLOCK) {
641 if (!read_count)
642 read_count = -EAGAIN;
643 goto nodata;
644 } else {
645 int error;
646
647 /*
648 * No data available at the moment, return what
649 * we got.
650 */
651 if (read_count)
652 goto nodata;
653
654 /*
655 * Wait for returned len to be >= 0 or -ENODATA.
656 */
657 if (fusionmerge)
658 error = wait_event_interruptible(
659 chan->read_wait,
660 ((len = channel_get_next_record(chan,
661 &buf)), len != -EAGAIN));
662 else
663 error = wait_event_interruptible(
664 buf->read_wait,
665 ((len = lib_ring_buffer_get_next_record(
666 chan, buf)), len != -EAGAIN));
667 CHAN_WARN_ON(chan, len == -EBUSY);
668 if (error) {
669 read_count = error;
670 goto nodata;
671 }
672 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
673 goto len_test;
674 }
675 }
676 read_offset = buf->iter.read_offset;
677 skip_get_next:
678 space_left = count - read_count;
679 if (len <= space_left) {
680 copy_len = len;
681 chan->iter.len_left = 0;
682 *ppos = 0;
683 } else {
684 copy_len = space_left;
685 chan->iter.len_left = len - copy_len;
686 *ppos = read_offset + copy_len;
687 }
688 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
689 &user_buf[read_count],
690 copy_len)) {
691 /*
692 * Leave the len_left and ppos values at their current
693 * state, as we currently have a valid event to read.
694 */
695 return -EFAULT;
696 }
697 read_count += copy_len;
698 };
699 return read_count;
700
701 nodata:
702 *ppos = 0;
703 chan->iter.len_left = 0;
704 return read_count;
705 }
706
707 /**
708 * lib_ring_buffer_file_read - Read buffer record payload.
709 * @filp: file structure pointer.
710 * @buffer: user buffer to read data into.
711 * @count: number of bytes to read.
712 * @ppos: file read position.
713 *
714 * Returns a negative value on error, or the number of bytes read on success.
715 * ppos is used to save the position _within the current record_ between calls
716 * to read().
717 */
718 static
719 ssize_t lib_ring_buffer_file_read(struct file *filp,
720 char __user *user_buf,
721 size_t count,
722 loff_t *ppos)
723 {
724 struct inode *inode = filp->lttng_f_dentry->d_inode;
725 struct lib_ring_buffer *buf = inode->i_private;
726 struct channel *chan = buf->backend.chan;
727
728 return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
729 chan, buf, 0);
730 }
731
732 /**
733 * channel_file_read - Read channel record payload.
734 * @filp: file structure pointer.
735 * @buffer: user buffer to read data into.
736 * @count: number of bytes to read.
737 * @ppos: file read position.
738 *
739 * Returns a negative value on error, or the number of bytes read on success.
740 * ppos is used to save the position _within the current record_ between calls
741 * to read().
742 */
743 static
744 ssize_t channel_file_read(struct file *filp,
745 char __user *user_buf,
746 size_t count,
747 loff_t *ppos)
748 {
749 struct inode *inode = filp->lttng_f_dentry->d_inode;
750 struct channel *chan = inode->i_private;
751 const struct lib_ring_buffer_config *config = &chan->backend.config;
752
753 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
754 return channel_ring_buffer_file_read(filp, user_buf, count,
755 ppos, chan, NULL, 1);
756 else {
757 struct lib_ring_buffer *buf =
758 channel_get_ring_buffer(config, chan, 0);
759 return channel_ring_buffer_file_read(filp, user_buf, count,
760 ppos, chan, buf, 0);
761 }
762 }
763
764 static
765 int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
766 {
767 struct lib_ring_buffer *buf = inode->i_private;
768 int ret;
769
770 ret = lib_ring_buffer_iterator_open(buf);
771 if (ret)
772 return ret;
773
774 file->private_data = buf;
775 ret = nonseekable_open(inode, file);
776 if (ret)
777 goto release_iter;
778 return 0;
779
780 release_iter:
781 lib_ring_buffer_iterator_release(buf);
782 return ret;
783 }
784
785 static
786 int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
787 {
788 struct lib_ring_buffer *buf = inode->i_private;
789
790 lib_ring_buffer_iterator_release(buf);
791 return 0;
792 }
793
794 static
795 int channel_file_open(struct inode *inode, struct file *file)
796 {
797 struct channel *chan = inode->i_private;
798 int ret;
799
800 ret = channel_iterator_open(chan);
801 if (ret)
802 return ret;
803
804 file->private_data = chan;
805 ret = nonseekable_open(inode, file);
806 if (ret)
807 goto release_iter;
808 return 0;
809
810 release_iter:
811 channel_iterator_release(chan);
812 return ret;
813 }
814
815 static
816 int channel_file_release(struct inode *inode, struct file *file)
817 {
818 struct channel *chan = inode->i_private;
819
820 channel_iterator_release(chan);
821 return 0;
822 }
823
824 const struct file_operations channel_payload_file_operations = {
825 .owner = THIS_MODULE,
826 .open = channel_file_open,
827 .release = channel_file_release,
828 .read = channel_file_read,
829 .llseek = vfs_lib_ring_buffer_no_llseek,
830 };
831 EXPORT_SYMBOL_GPL(channel_payload_file_operations);
832
833 const struct file_operations lib_ring_buffer_payload_file_operations = {
834 .owner = THIS_MODULE,
835 .open = lib_ring_buffer_file_open,
836 .release = lib_ring_buffer_file_release,
837 .read = lib_ring_buffer_file_read,
838 .llseek = vfs_lib_ring_buffer_no_llseek,
839 };
840 EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
This page took 0.047338 seconds and 3 git commands to generate.