Fix: Remove 'type' argument from access_ok() function (v5.0)
[lttng-modules.git] / lib / ringbuffer / ring_buffer_iterator.c
1 /* SPDX-License-Identifier: (GPL-2.0 OR LGPL-2.1)
2 *
3 * ring_buffer_iterator.c
4 *
5 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
6 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
7 * complexity for the "get next event" operation.
8 *
9 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 */
11
12 #include <wrapper/ringbuffer/iterator.h>
13 #include <wrapper/file.h>
14 #include <wrapper/uaccess.h>
15 #include <linux/jiffies.h>
16 #include <linux/delay.h>
17 #include <linux/module.h>
18
19 /*
20 * Safety factor taking into account internal kernel interrupt latency.
21 * Assuming 250ms worse-case latency.
22 */
23 #define MAX_SYSTEM_LATENCY 250
24
25 /*
26 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
27 */
28 #define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
29
30 /**
31 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
32 * @chan: channel
33 * @buf: buffer
34 *
35 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
36 * buffer is empty and finalized. The buffer must already be opened for reading.
37 */
38 ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
39 struct lib_ring_buffer *buf)
40 {
41 const struct lib_ring_buffer_config *config = &chan->backend.config;
42 struct lib_ring_buffer_iter *iter = &buf->iter;
43 int ret;
44
45 restart:
46 switch (iter->state) {
47 case ITER_GET_SUBBUF:
48 ret = lib_ring_buffer_get_next_subbuf(buf);
49 if (ret && !READ_ONCE(buf->finalized)
50 && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
51 /*
52 * Use "pull" scheme for global buffers. The reader
53 * itself flushes the buffer to "pull" data not visible
54 * to readers yet. Flush current subbuffer and re-try.
55 *
56 * Per-CPU buffers rather use a "push" scheme because
57 * the IPI needed to flush all CPU's buffers is too
58 * costly. In the "push" scheme, the reader waits for
59 * the writer periodic timer to flush the
60 * buffers (keeping track of a quiescent state
61 * timestamp). Therefore, the writer "pushes" data out
62 * of the buffers rather than letting the reader "pull"
63 * data from the buffer.
64 */
65 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
66 ret = lib_ring_buffer_get_next_subbuf(buf);
67 }
68 if (ret)
69 return ret;
70 iter->consumed = buf->cons_snapshot;
71 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
72 iter->read_offset = iter->consumed;
73 /* skip header */
74 iter->read_offset += config->cb.subbuffer_header_size();
75 iter->state = ITER_TEST_RECORD;
76 goto restart;
77 case ITER_TEST_RECORD:
78 if (iter->read_offset - iter->consumed >= iter->data_size) {
79 iter->state = ITER_PUT_SUBBUF;
80 } else {
81 CHAN_WARN_ON(chan, !config->cb.record_get);
82 config->cb.record_get(config, chan, buf,
83 iter->read_offset,
84 &iter->header_len,
85 &iter->payload_len,
86 &iter->timestamp);
87 iter->read_offset += iter->header_len;
88 subbuffer_consume_record(config, &buf->backend);
89 iter->state = ITER_NEXT_RECORD;
90 return iter->payload_len;
91 }
92 goto restart;
93 case ITER_NEXT_RECORD:
94 iter->read_offset += iter->payload_len;
95 iter->state = ITER_TEST_RECORD;
96 goto restart;
97 case ITER_PUT_SUBBUF:
98 lib_ring_buffer_put_next_subbuf(buf);
99 iter->state = ITER_GET_SUBBUF;
100 goto restart;
101 default:
102 CHAN_WARN_ON(chan, 1); /* Should not happen */
103 return -EPERM;
104 }
105 }
106 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
107
108 static int buf_is_higher(void *a, void *b)
109 {
110 struct lib_ring_buffer *bufa = a;
111 struct lib_ring_buffer *bufb = b;
112
113 /* Consider lowest timestamps to be at the top of the heap */
114 return (bufa->iter.timestamp < bufb->iter.timestamp);
115 }
116
117 static
118 void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
119 struct channel *chan)
120 {
121 struct lttng_ptr_heap *heap = &chan->iter.heap;
122 struct lib_ring_buffer *buf, *tmp;
123 ssize_t len;
124
125 list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
126 iter.empty_node) {
127 len = lib_ring_buffer_get_next_record(chan, buf);
128
129 /*
130 * Deal with -EAGAIN and -ENODATA.
131 * len >= 0 means record contains data.
132 * -EBUSY should never happen, because we support only one
133 * reader.
134 */
135 switch (len) {
136 case -EAGAIN:
137 /* Keep node in empty list */
138 break;
139 case -ENODATA:
140 /*
141 * Buffer is finalized. Don't add to list of empty
142 * buffer, because it has no more data to provide, ever.
143 */
144 list_del(&buf->iter.empty_node);
145 break;
146 case -EBUSY:
147 CHAN_WARN_ON(chan, 1);
148 break;
149 default:
150 /*
151 * Insert buffer into the heap, remove from empty buffer
152 * list.
153 */
154 CHAN_WARN_ON(chan, len < 0);
155 list_del(&buf->iter.empty_node);
156 CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
157 }
158 }
159 }
160
161 static
162 void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
163 struct channel *chan)
164 {
165 u64 timestamp_qs;
166 unsigned long wait_msecs;
167
168 /*
169 * No need to wait if no empty buffers are present.
170 */
171 if (list_empty(&chan->iter.empty_head))
172 return;
173
174 timestamp_qs = config->cb.ring_buffer_clock_read(chan);
175 /*
176 * We need to consider previously empty buffers.
177 * Do a get next buf record on each of them. Add them to
178 * the heap if they have data. If at least one of them
179 * don't have data, we need to wait for
180 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
181 * buffers have been switched either by the timer or idle entry) and
182 * check them again, adding them if they have data.
183 */
184 lib_ring_buffer_get_empty_buf_records(config, chan);
185
186 /*
187 * No need to wait if no empty buffers are present.
188 */
189 if (list_empty(&chan->iter.empty_head))
190 return;
191
192 /*
193 * We need to wait for the buffer switch timer to run. If the
194 * CPU is idle, idle entry performed the switch.
195 * TODO: we could optimize further by skipping the sleep if all
196 * empty buffers belong to idle or offline cpus.
197 */
198 wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
199 wait_msecs += MAX_SYSTEM_LATENCY;
200 msleep(wait_msecs);
201 lib_ring_buffer_get_empty_buf_records(config, chan);
202 /*
203 * Any buffer still in the empty list here cannot possibly
204 * contain an event with a timestamp prior to "timestamp_qs".
205 * The new quiescent state timestamp is the one we grabbed
206 * before waiting for buffer data. It is therefore safe to
207 * ignore empty buffers up to last_qs timestamp for fusion
208 * merge.
209 */
210 chan->iter.last_qs = timestamp_qs;
211 }
212
213 /**
214 * channel_get_next_record - Get the next record in a channel.
215 * @chan: channel
216 * @ret_buf: the buffer in which the event is located (output)
217 *
218 * Returns the size of new current event, -EAGAIN if all buffers are empty,
219 * -ENODATA if all buffers are empty and finalized. The channel must already be
220 * opened for reading.
221 */
222
223 ssize_t channel_get_next_record(struct channel *chan,
224 struct lib_ring_buffer **ret_buf)
225 {
226 const struct lib_ring_buffer_config *config = &chan->backend.config;
227 struct lib_ring_buffer *buf;
228 struct lttng_ptr_heap *heap;
229 ssize_t len;
230
231 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
232 *ret_buf = channel_get_ring_buffer(config, chan, 0);
233 return lib_ring_buffer_get_next_record(chan, *ret_buf);
234 }
235
236 heap = &chan->iter.heap;
237
238 /*
239 * get next record for topmost buffer.
240 */
241 buf = lttng_heap_maximum(heap);
242 if (buf) {
243 len = lib_ring_buffer_get_next_record(chan, buf);
244 /*
245 * Deal with -EAGAIN and -ENODATA.
246 * len >= 0 means record contains data.
247 */
248 switch (len) {
249 case -EAGAIN:
250 buf->iter.timestamp = 0;
251 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
252 /* Remove topmost buffer from the heap */
253 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
254 break;
255 case -ENODATA:
256 /*
257 * Buffer is finalized. Remove buffer from heap and
258 * don't add to list of empty buffer, because it has no
259 * more data to provide, ever.
260 */
261 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
262 break;
263 case -EBUSY:
264 CHAN_WARN_ON(chan, 1);
265 break;
266 default:
267 /*
268 * Reinsert buffer into the heap. Note that heap can be
269 * partially empty, so we need to use
270 * lttng_heap_replace_max().
271 */
272 CHAN_WARN_ON(chan, len < 0);
273 CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
274 break;
275 }
276 }
277
278 buf = lttng_heap_maximum(heap);
279 if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
280 /*
281 * Deal with buffers previously showing no data.
282 * Add buffers containing data to the heap, update
283 * last_qs.
284 */
285 lib_ring_buffer_wait_for_qs(config, chan);
286 }
287
288 *ret_buf = buf = lttng_heap_maximum(heap);
289 if (buf) {
290 /*
291 * If this warning triggers, you probably need to check your
292 * system interrupt latency. Typical causes: too many printk()
293 * output going to a serial console with interrupts off.
294 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
295 * Observed on SMP KVM setups with trace_clock().
296 */
297 if (chan->iter.last_timestamp
298 > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
299 printk(KERN_WARNING "ring_buffer: timestamps going "
300 "backward. Last time %llu ns, cpu %d, "
301 "current time %llu ns, cpu %d, "
302 "delta %llu ns.\n",
303 chan->iter.last_timestamp, chan->iter.last_cpu,
304 buf->iter.timestamp, buf->backend.cpu,
305 chan->iter.last_timestamp - buf->iter.timestamp);
306 CHAN_WARN_ON(chan, 1);
307 }
308 chan->iter.last_timestamp = buf->iter.timestamp;
309 chan->iter.last_cpu = buf->backend.cpu;
310 return buf->iter.payload_len;
311 } else {
312 /* Heap is empty */
313 if (list_empty(&chan->iter.empty_head))
314 return -ENODATA; /* All buffers finalized */
315 else
316 return -EAGAIN; /* Temporarily empty */
317 }
318 }
319 EXPORT_SYMBOL_GPL(channel_get_next_record);
320
321 static
322 void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
323 {
324 if (buf->iter.allocated)
325 return;
326
327 buf->iter.allocated = 1;
328 if (chan->iter.read_open && !buf->iter.read_open) {
329 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
330 buf->iter.read_open = 1;
331 }
332
333 /* Add to list of buffers without any current record */
334 if (chan->backend.config.alloc == RING_BUFFER_ALLOC_PER_CPU)
335 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
336 }
337
338 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
339
340 int lttng_cpuhp_rb_iter_online(unsigned int cpu,
341 struct lttng_cpuhp_node *node)
342 {
343 struct channel *chan = container_of(node, struct channel,
344 cpuhp_iter_online);
345 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
346 const struct lib_ring_buffer_config *config = &chan->backend.config;
347
348 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
349
350 lib_ring_buffer_iterator_init(chan, buf);
351 return 0;
352 }
353 EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_iter_online);
354
355 #else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
356
357 #ifdef CONFIG_HOTPLUG_CPU
358 static
359 int channel_iterator_cpu_hotplug(struct notifier_block *nb,
360 unsigned long action,
361 void *hcpu)
362 {
363 unsigned int cpu = (unsigned long)hcpu;
364 struct channel *chan = container_of(nb, struct channel,
365 hp_iter_notifier);
366 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
367 const struct lib_ring_buffer_config *config = &chan->backend.config;
368
369 if (!chan->hp_iter_enable)
370 return NOTIFY_DONE;
371
372 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
373
374 switch (action) {
375 case CPU_DOWN_FAILED:
376 case CPU_DOWN_FAILED_FROZEN:
377 case CPU_ONLINE:
378 case CPU_ONLINE_FROZEN:
379 lib_ring_buffer_iterator_init(chan, buf);
380 return NOTIFY_OK;
381 default:
382 return NOTIFY_DONE;
383 }
384 }
385 #endif
386
387 #endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
388
389 int channel_iterator_init(struct channel *chan)
390 {
391 const struct lib_ring_buffer_config *config = &chan->backend.config;
392 struct lib_ring_buffer *buf;
393
394 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
395 int ret;
396
397 INIT_LIST_HEAD(&chan->iter.empty_head);
398 ret = lttng_heap_init(&chan->iter.heap,
399 num_possible_cpus(),
400 GFP_KERNEL, buf_is_higher);
401 if (ret)
402 return ret;
403
404 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
405 chan->cpuhp_iter_online.component = LTTNG_RING_BUFFER_ITER;
406 ret = cpuhp_state_add_instance(lttng_rb_hp_online,
407 &chan->cpuhp_iter_online.node);
408 if (ret)
409 return ret;
410 #else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
411 {
412 int cpu;
413
414 /*
415 * In case of non-hotplug cpu, if the ring-buffer is allocated
416 * in early initcall, it will not be notified of secondary cpus.
417 * In that off case, we need to allocate for all possible cpus.
418 */
419 #ifdef CONFIG_HOTPLUG_CPU
420 chan->hp_iter_notifier.notifier_call =
421 channel_iterator_cpu_hotplug;
422 chan->hp_iter_notifier.priority = 10;
423 register_cpu_notifier(&chan->hp_iter_notifier);
424
425 get_online_cpus();
426 for_each_online_cpu(cpu) {
427 buf = per_cpu_ptr(chan->backend.buf, cpu);
428 lib_ring_buffer_iterator_init(chan, buf);
429 }
430 chan->hp_iter_enable = 1;
431 put_online_cpus();
432 #else
433 for_each_possible_cpu(cpu) {
434 buf = per_cpu_ptr(chan->backend.buf, cpu);
435 lib_ring_buffer_iterator_init(chan, buf);
436 }
437 #endif
438 }
439 #endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
440 } else {
441 buf = channel_get_ring_buffer(config, chan, 0);
442 lib_ring_buffer_iterator_init(chan, buf);
443 }
444 return 0;
445 }
446
447 void channel_iterator_unregister_notifiers(struct channel *chan)
448 {
449 const struct lib_ring_buffer_config *config = &chan->backend.config;
450
451 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
452 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
453 {
454 int ret;
455
456 ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
457 &chan->cpuhp_iter_online.node);
458 WARN_ON(ret);
459 }
460 #else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
461 chan->hp_iter_enable = 0;
462 unregister_cpu_notifier(&chan->hp_iter_notifier);
463 #endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
464 }
465 }
466
467 void channel_iterator_free(struct channel *chan)
468 {
469 const struct lib_ring_buffer_config *config = &chan->backend.config;
470
471 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
472 lttng_heap_free(&chan->iter.heap);
473 }
474
475 int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
476 {
477 struct channel *chan = buf->backend.chan;
478 const struct lib_ring_buffer_config *config = &chan->backend.config;
479 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
480 return lib_ring_buffer_open_read(buf);
481 }
482 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
483
484 /*
485 * Note: Iterators must not be mixed with other types of outputs, because an
486 * iterator can leave the buffer in "GET" state, which is not consistent with
487 * other types of output (mmap, splice, raw data read).
488 */
489 void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
490 {
491 lib_ring_buffer_release_read(buf);
492 }
493 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
494
495 int channel_iterator_open(struct channel *chan)
496 {
497 const struct lib_ring_buffer_config *config = &chan->backend.config;
498 struct lib_ring_buffer *buf;
499 int ret = 0, cpu;
500
501 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
502
503 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
504 get_online_cpus();
505 /* Allow CPU hotplug to keep track of opened reader */
506 chan->iter.read_open = 1;
507 for_each_channel_cpu(cpu, chan) {
508 buf = channel_get_ring_buffer(config, chan, cpu);
509 ret = lib_ring_buffer_iterator_open(buf);
510 if (ret)
511 goto error;
512 buf->iter.read_open = 1;
513 }
514 put_online_cpus();
515 } else {
516 buf = channel_get_ring_buffer(config, chan, 0);
517 ret = lib_ring_buffer_iterator_open(buf);
518 }
519 return ret;
520 error:
521 /* Error should always happen on CPU 0, hence no close is required. */
522 CHAN_WARN_ON(chan, cpu != 0);
523 put_online_cpus();
524 return ret;
525 }
526 EXPORT_SYMBOL_GPL(channel_iterator_open);
527
528 void channel_iterator_release(struct channel *chan)
529 {
530 const struct lib_ring_buffer_config *config = &chan->backend.config;
531 struct lib_ring_buffer *buf;
532 int cpu;
533
534 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
535 get_online_cpus();
536 for_each_channel_cpu(cpu, chan) {
537 buf = channel_get_ring_buffer(config, chan, cpu);
538 if (buf->iter.read_open) {
539 lib_ring_buffer_iterator_release(buf);
540 buf->iter.read_open = 0;
541 }
542 }
543 chan->iter.read_open = 0;
544 put_online_cpus();
545 } else {
546 buf = channel_get_ring_buffer(config, chan, 0);
547 lib_ring_buffer_iterator_release(buf);
548 }
549 }
550 EXPORT_SYMBOL_GPL(channel_iterator_release);
551
552 void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
553 {
554 struct channel *chan = buf->backend.chan;
555
556 if (buf->iter.state != ITER_GET_SUBBUF)
557 lib_ring_buffer_put_next_subbuf(buf);
558 buf->iter.state = ITER_GET_SUBBUF;
559 /* Remove from heap (if present). */
560 if (lttng_heap_cherrypick(&chan->iter.heap, buf))
561 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
562 buf->iter.timestamp = 0;
563 buf->iter.header_len = 0;
564 buf->iter.payload_len = 0;
565 buf->iter.consumed = 0;
566 buf->iter.read_offset = 0;
567 buf->iter.data_size = 0;
568 /* Don't reset allocated and read_open */
569 }
570
571 void channel_iterator_reset(struct channel *chan)
572 {
573 const struct lib_ring_buffer_config *config = &chan->backend.config;
574 struct lib_ring_buffer *buf;
575 int cpu;
576
577 /* Empty heap, put into empty_head */
578 while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
579 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
580
581 for_each_channel_cpu(cpu, chan) {
582 buf = channel_get_ring_buffer(config, chan, cpu);
583 lib_ring_buffer_iterator_reset(buf);
584 }
585 /* Don't reset read_open */
586 chan->iter.last_qs = 0;
587 chan->iter.last_timestamp = 0;
588 chan->iter.last_cpu = 0;
589 chan->iter.len_left = 0;
590 }
591
592 /*
593 * Ring buffer payload extraction read() implementation.
594 */
595 static
596 ssize_t channel_ring_buffer_file_read(struct file *filp,
597 char __user *user_buf,
598 size_t count,
599 loff_t *ppos,
600 struct channel *chan,
601 struct lib_ring_buffer *buf,
602 int fusionmerge)
603 {
604 const struct lib_ring_buffer_config *config = &chan->backend.config;
605 size_t read_count = 0, read_offset;
606 ssize_t len;
607
608 might_sleep();
609 if (!lttng_access_ok(VERIFY_WRITE, user_buf, count))
610 return -EFAULT;
611
612 /* Finish copy of previous record */
613 if (*ppos != 0) {
614 if (read_count < count) {
615 len = chan->iter.len_left;
616 read_offset = *ppos;
617 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
618 && fusionmerge)
619 buf = lttng_heap_maximum(&chan->iter.heap);
620 CHAN_WARN_ON(chan, !buf);
621 goto skip_get_next;
622 }
623 }
624
625 while (read_count < count) {
626 size_t copy_len, space_left;
627
628 if (fusionmerge)
629 len = channel_get_next_record(chan, &buf);
630 else
631 len = lib_ring_buffer_get_next_record(chan, buf);
632 len_test:
633 if (len < 0) {
634 /*
635 * Check if buffer is finalized (end of file).
636 */
637 if (len == -ENODATA) {
638 /* A 0 read_count will tell about end of file */
639 goto nodata;
640 }
641 if (filp->f_flags & O_NONBLOCK) {
642 if (!read_count)
643 read_count = -EAGAIN;
644 goto nodata;
645 } else {
646 int error;
647
648 /*
649 * No data available at the moment, return what
650 * we got.
651 */
652 if (read_count)
653 goto nodata;
654
655 /*
656 * Wait for returned len to be >= 0 or -ENODATA.
657 */
658 if (fusionmerge)
659 error = wait_event_interruptible(
660 chan->read_wait,
661 ((len = channel_get_next_record(chan,
662 &buf)), len != -EAGAIN));
663 else
664 error = wait_event_interruptible(
665 buf->read_wait,
666 ((len = lib_ring_buffer_get_next_record(
667 chan, buf)), len != -EAGAIN));
668 CHAN_WARN_ON(chan, len == -EBUSY);
669 if (error) {
670 read_count = error;
671 goto nodata;
672 }
673 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
674 goto len_test;
675 }
676 }
677 read_offset = buf->iter.read_offset;
678 skip_get_next:
679 space_left = count - read_count;
680 if (len <= space_left) {
681 copy_len = len;
682 chan->iter.len_left = 0;
683 *ppos = 0;
684 } else {
685 copy_len = space_left;
686 chan->iter.len_left = len - copy_len;
687 *ppos = read_offset + copy_len;
688 }
689 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
690 &user_buf[read_count],
691 copy_len)) {
692 /*
693 * Leave the len_left and ppos values at their current
694 * state, as we currently have a valid event to read.
695 */
696 return -EFAULT;
697 }
698 read_count += copy_len;
699 };
700 return read_count;
701
702 nodata:
703 *ppos = 0;
704 chan->iter.len_left = 0;
705 return read_count;
706 }
707
708 /**
709 * lib_ring_buffer_file_read - Read buffer record payload.
710 * @filp: file structure pointer.
711 * @buffer: user buffer to read data into.
712 * @count: number of bytes to read.
713 * @ppos: file read position.
714 *
715 * Returns a negative value on error, or the number of bytes read on success.
716 * ppos is used to save the position _within the current record_ between calls
717 * to read().
718 */
719 static
720 ssize_t lib_ring_buffer_file_read(struct file *filp,
721 char __user *user_buf,
722 size_t count,
723 loff_t *ppos)
724 {
725 struct inode *inode = filp->lttng_f_dentry->d_inode;
726 struct lib_ring_buffer *buf = inode->i_private;
727 struct channel *chan = buf->backend.chan;
728
729 return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
730 chan, buf, 0);
731 }
732
733 /**
734 * channel_file_read - Read channel record payload.
735 * @filp: file structure pointer.
736 * @buffer: user buffer to read data into.
737 * @count: number of bytes to read.
738 * @ppos: file read position.
739 *
740 * Returns a negative value on error, or the number of bytes read on success.
741 * ppos is used to save the position _within the current record_ between calls
742 * to read().
743 */
744 static
745 ssize_t channel_file_read(struct file *filp,
746 char __user *user_buf,
747 size_t count,
748 loff_t *ppos)
749 {
750 struct inode *inode = filp->lttng_f_dentry->d_inode;
751 struct channel *chan = inode->i_private;
752 const struct lib_ring_buffer_config *config = &chan->backend.config;
753
754 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
755 return channel_ring_buffer_file_read(filp, user_buf, count,
756 ppos, chan, NULL, 1);
757 else {
758 struct lib_ring_buffer *buf =
759 channel_get_ring_buffer(config, chan, 0);
760 return channel_ring_buffer_file_read(filp, user_buf, count,
761 ppos, chan, buf, 0);
762 }
763 }
764
765 static
766 int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
767 {
768 struct lib_ring_buffer *buf = inode->i_private;
769 int ret;
770
771 ret = lib_ring_buffer_iterator_open(buf);
772 if (ret)
773 return ret;
774
775 file->private_data = buf;
776 ret = nonseekable_open(inode, file);
777 if (ret)
778 goto release_iter;
779 return 0;
780
781 release_iter:
782 lib_ring_buffer_iterator_release(buf);
783 return ret;
784 }
785
786 static
787 int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
788 {
789 struct lib_ring_buffer *buf = inode->i_private;
790
791 lib_ring_buffer_iterator_release(buf);
792 return 0;
793 }
794
795 static
796 int channel_file_open(struct inode *inode, struct file *file)
797 {
798 struct channel *chan = inode->i_private;
799 int ret;
800
801 ret = channel_iterator_open(chan);
802 if (ret)
803 return ret;
804
805 file->private_data = chan;
806 ret = nonseekable_open(inode, file);
807 if (ret)
808 goto release_iter;
809 return 0;
810
811 release_iter:
812 channel_iterator_release(chan);
813 return ret;
814 }
815
816 static
817 int channel_file_release(struct inode *inode, struct file *file)
818 {
819 struct channel *chan = inode->i_private;
820
821 channel_iterator_release(chan);
822 return 0;
823 }
824
825 const struct file_operations channel_payload_file_operations = {
826 .owner = THIS_MODULE,
827 .open = channel_file_open,
828 .release = channel_file_release,
829 .read = channel_file_read,
830 .llseek = vfs_lib_ring_buffer_no_llseek,
831 };
832 EXPORT_SYMBOL_GPL(channel_payload_file_operations);
833
834 const struct file_operations lib_ring_buffer_payload_file_operations = {
835 .owner = THIS_MODULE,
836 .open = lib_ring_buffer_file_open,
837 .release = lib_ring_buffer_file_release,
838 .read = lib_ring_buffer_file_read,
839 .llseek = vfs_lib_ring_buffer_no_llseek,
840 };
841 EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
This page took 0.045685 seconds and 4 git commands to generate.