fix: cpu/hotplug: Remove deprecated CPU-hotplug functions. (v5.15)
[lttng-modules.git] / lib / ringbuffer / ring_buffer_iterator.c
1 /* SPDX-License-Identifier: (GPL-2.0 OR LGPL-2.1)
2 *
3 * ring_buffer_iterator.c
4 *
5 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
6 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
7 * complexity for the "get next event" operation.
8 *
9 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 */
11
12 #include <wrapper/ringbuffer/iterator.h>
13 #include <wrapper/cpu.h>
14 #include <wrapper/file.h>
15 #include <wrapper/uaccess.h>
16 #include <linux/jiffies.h>
17 #include <linux/delay.h>
18 #include <linux/module.h>
19
20 /*
21 * Safety factor taking into account internal kernel interrupt latency.
22 * Assuming 250ms worse-case latency.
23 */
24 #define MAX_SYSTEM_LATENCY 250
25
26 /*
27 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
28 */
29 #define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
30
31 /**
32 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
33 * @chan: channel
34 * @buf: buffer
35 *
36 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
37 * buffer is empty and finalized. The buffer must already be opened for reading.
38 */
39 ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
40 struct lib_ring_buffer *buf)
41 {
42 const struct lib_ring_buffer_config *config = &chan->backend.config;
43 struct lib_ring_buffer_iter *iter = &buf->iter;
44 int ret;
45
46 restart:
47 switch (iter->state) {
48 case ITER_GET_SUBBUF:
49 ret = lib_ring_buffer_get_next_subbuf(buf);
50 if (ret && !LTTNG_READ_ONCE(buf->finalized)
51 && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
52 /*
53 * Use "pull" scheme for global buffers. The reader
54 * itself flushes the buffer to "pull" data not visible
55 * to readers yet. Flush current subbuffer and re-try.
56 *
57 * Per-CPU buffers rather use a "push" scheme because
58 * the IPI needed to flush all CPU's buffers is too
59 * costly. In the "push" scheme, the reader waits for
60 * the writer periodic timer to flush the
61 * buffers (keeping track of a quiescent state
62 * timestamp). Therefore, the writer "pushes" data out
63 * of the buffers rather than letting the reader "pull"
64 * data from the buffer.
65 */
66 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
67 ret = lib_ring_buffer_get_next_subbuf(buf);
68 }
69 if (ret)
70 return ret;
71 iter->consumed = buf->cons_snapshot;
72 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
73 iter->read_offset = iter->consumed;
74 /* skip header */
75 iter->read_offset += config->cb.subbuffer_header_size();
76 iter->state = ITER_TEST_RECORD;
77 goto restart;
78 case ITER_TEST_RECORD:
79 if (iter->read_offset - iter->consumed >= iter->data_size) {
80 iter->state = ITER_PUT_SUBBUF;
81 } else {
82 CHAN_WARN_ON(chan, !config->cb.record_get);
83 config->cb.record_get(config, chan, buf,
84 iter->read_offset,
85 &iter->header_len,
86 &iter->payload_len,
87 &iter->timestamp);
88 iter->read_offset += iter->header_len;
89 subbuffer_consume_record(config, &buf->backend);
90 iter->state = ITER_NEXT_RECORD;
91 return iter->payload_len;
92 }
93 goto restart;
94 case ITER_NEXT_RECORD:
95 iter->read_offset += iter->payload_len;
96 iter->state = ITER_TEST_RECORD;
97 goto restart;
98 case ITER_PUT_SUBBUF:
99 lib_ring_buffer_put_next_subbuf(buf);
100 iter->state = ITER_GET_SUBBUF;
101 goto restart;
102 default:
103 CHAN_WARN_ON(chan, 1); /* Should not happen */
104 return -EPERM;
105 }
106 }
107 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
108
109 static int buf_is_higher(void *a, void *b)
110 {
111 struct lib_ring_buffer *bufa = a;
112 struct lib_ring_buffer *bufb = b;
113
114 /* Consider lowest timestamps to be at the top of the heap */
115 return (bufa->iter.timestamp < bufb->iter.timestamp);
116 }
117
118 static
119 void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
120 struct channel *chan)
121 {
122 struct lttng_ptr_heap *heap = &chan->iter.heap;
123 struct lib_ring_buffer *buf, *tmp;
124 ssize_t len;
125
126 list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
127 iter.empty_node) {
128 len = lib_ring_buffer_get_next_record(chan, buf);
129
130 /*
131 * Deal with -EAGAIN and -ENODATA.
132 * len >= 0 means record contains data.
133 * -EBUSY should never happen, because we support only one
134 * reader.
135 */
136 switch (len) {
137 case -EAGAIN:
138 /* Keep node in empty list */
139 break;
140 case -ENODATA:
141 /*
142 * Buffer is finalized. Don't add to list of empty
143 * buffer, because it has no more data to provide, ever.
144 */
145 list_del(&buf->iter.empty_node);
146 break;
147 case -EBUSY:
148 CHAN_WARN_ON(chan, 1);
149 break;
150 default:
151 /*
152 * Insert buffer into the heap, remove from empty buffer
153 * list.
154 */
155 CHAN_WARN_ON(chan, len < 0);
156 list_del(&buf->iter.empty_node);
157 CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
158 }
159 }
160 }
161
162 static
163 void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
164 struct channel *chan)
165 {
166 u64 timestamp_qs;
167 unsigned long wait_msecs;
168
169 /*
170 * No need to wait if no empty buffers are present.
171 */
172 if (list_empty(&chan->iter.empty_head))
173 return;
174
175 timestamp_qs = config->cb.ring_buffer_clock_read(chan);
176 /*
177 * We need to consider previously empty buffers.
178 * Do a get next buf record on each of them. Add them to
179 * the heap if they have data. If at least one of them
180 * don't have data, we need to wait for
181 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
182 * buffers have been switched either by the timer or idle entry) and
183 * check them again, adding them if they have data.
184 */
185 lib_ring_buffer_get_empty_buf_records(config, chan);
186
187 /*
188 * No need to wait if no empty buffers are present.
189 */
190 if (list_empty(&chan->iter.empty_head))
191 return;
192
193 /*
194 * We need to wait for the buffer switch timer to run. If the
195 * CPU is idle, idle entry performed the switch.
196 * TODO: we could optimize further by skipping the sleep if all
197 * empty buffers belong to idle or offline cpus.
198 */
199 wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
200 wait_msecs += MAX_SYSTEM_LATENCY;
201 msleep(wait_msecs);
202 lib_ring_buffer_get_empty_buf_records(config, chan);
203 /*
204 * Any buffer still in the empty list here cannot possibly
205 * contain an event with a timestamp prior to "timestamp_qs".
206 * The new quiescent state timestamp is the one we grabbed
207 * before waiting for buffer data. It is therefore safe to
208 * ignore empty buffers up to last_qs timestamp for fusion
209 * merge.
210 */
211 chan->iter.last_qs = timestamp_qs;
212 }
213
214 /**
215 * channel_get_next_record - Get the next record in a channel.
216 * @chan: channel
217 * @ret_buf: the buffer in which the event is located (output)
218 *
219 * Returns the size of new current event, -EAGAIN if all buffers are empty,
220 * -ENODATA if all buffers are empty and finalized. The channel must already be
221 * opened for reading.
222 */
223
224 ssize_t channel_get_next_record(struct channel *chan,
225 struct lib_ring_buffer **ret_buf)
226 {
227 const struct lib_ring_buffer_config *config = &chan->backend.config;
228 struct lib_ring_buffer *buf;
229 struct lttng_ptr_heap *heap;
230 ssize_t len;
231
232 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
233 *ret_buf = channel_get_ring_buffer(config, chan, 0);
234 return lib_ring_buffer_get_next_record(chan, *ret_buf);
235 }
236
237 heap = &chan->iter.heap;
238
239 /*
240 * get next record for topmost buffer.
241 */
242 buf = lttng_heap_maximum(heap);
243 if (buf) {
244 len = lib_ring_buffer_get_next_record(chan, buf);
245 /*
246 * Deal with -EAGAIN and -ENODATA.
247 * len >= 0 means record contains data.
248 */
249 switch (len) {
250 case -EAGAIN:
251 buf->iter.timestamp = 0;
252 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
253 /* Remove topmost buffer from the heap */
254 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
255 break;
256 case -ENODATA:
257 /*
258 * Buffer is finalized. Remove buffer from heap and
259 * don't add to list of empty buffer, because it has no
260 * more data to provide, ever.
261 */
262 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
263 break;
264 case -EBUSY:
265 CHAN_WARN_ON(chan, 1);
266 break;
267 default:
268 /*
269 * Reinsert buffer into the heap. Note that heap can be
270 * partially empty, so we need to use
271 * lttng_heap_replace_max().
272 */
273 CHAN_WARN_ON(chan, len < 0);
274 CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
275 break;
276 }
277 }
278
279 buf = lttng_heap_maximum(heap);
280 if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
281 /*
282 * Deal with buffers previously showing no data.
283 * Add buffers containing data to the heap, update
284 * last_qs.
285 */
286 lib_ring_buffer_wait_for_qs(config, chan);
287 }
288
289 *ret_buf = buf = lttng_heap_maximum(heap);
290 if (buf) {
291 /*
292 * If this warning triggers, you probably need to check your
293 * system interrupt latency. Typical causes: too many printk()
294 * output going to a serial console with interrupts off.
295 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
296 * Observed on SMP KVM setups with trace_clock().
297 */
298 if (chan->iter.last_timestamp
299 > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
300 printk(KERN_WARNING "ring_buffer: timestamps going "
301 "backward. Last time %llu ns, cpu %d, "
302 "current time %llu ns, cpu %d, "
303 "delta %llu ns.\n",
304 chan->iter.last_timestamp, chan->iter.last_cpu,
305 buf->iter.timestamp, buf->backend.cpu,
306 chan->iter.last_timestamp - buf->iter.timestamp);
307 CHAN_WARN_ON(chan, 1);
308 }
309 chan->iter.last_timestamp = buf->iter.timestamp;
310 chan->iter.last_cpu = buf->backend.cpu;
311 return buf->iter.payload_len;
312 } else {
313 /* Heap is empty */
314 if (list_empty(&chan->iter.empty_head))
315 return -ENODATA; /* All buffers finalized */
316 else
317 return -EAGAIN; /* Temporarily empty */
318 }
319 }
320 EXPORT_SYMBOL_GPL(channel_get_next_record);
321
322 static
323 void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
324 {
325 if (buf->iter.allocated)
326 return;
327
328 buf->iter.allocated = 1;
329 if (chan->iter.read_open && !buf->iter.read_open) {
330 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
331 buf->iter.read_open = 1;
332 }
333
334 /* Add to list of buffers without any current record */
335 if (chan->backend.config.alloc == RING_BUFFER_ALLOC_PER_CPU)
336 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
337 }
338
339 #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
340
341 int lttng_cpuhp_rb_iter_online(unsigned int cpu,
342 struct lttng_cpuhp_node *node)
343 {
344 struct channel *chan = container_of(node, struct channel,
345 cpuhp_iter_online);
346 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
347 const struct lib_ring_buffer_config *config = &chan->backend.config;
348
349 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
350
351 lib_ring_buffer_iterator_init(chan, buf);
352 return 0;
353 }
354 EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_iter_online);
355
356 #else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
357
358 #ifdef CONFIG_HOTPLUG_CPU
359 static
360 int channel_iterator_cpu_hotplug(struct notifier_block *nb,
361 unsigned long action,
362 void *hcpu)
363 {
364 unsigned int cpu = (unsigned long)hcpu;
365 struct channel *chan = container_of(nb, struct channel,
366 hp_iter_notifier);
367 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
368 const struct lib_ring_buffer_config *config = &chan->backend.config;
369
370 if (!chan->hp_iter_enable)
371 return NOTIFY_DONE;
372
373 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
374
375 switch (action) {
376 case CPU_DOWN_FAILED:
377 case CPU_DOWN_FAILED_FROZEN:
378 case CPU_ONLINE:
379 case CPU_ONLINE_FROZEN:
380 lib_ring_buffer_iterator_init(chan, buf);
381 return NOTIFY_OK;
382 default:
383 return NOTIFY_DONE;
384 }
385 }
386 #endif
387
388 #endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
389
390 int channel_iterator_init(struct channel *chan)
391 {
392 const struct lib_ring_buffer_config *config = &chan->backend.config;
393 struct lib_ring_buffer *buf;
394
395 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
396 int ret;
397
398 INIT_LIST_HEAD(&chan->iter.empty_head);
399 ret = lttng_heap_init(&chan->iter.heap,
400 num_possible_cpus(),
401 GFP_KERNEL, buf_is_higher);
402 if (ret)
403 return ret;
404
405 #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
406 chan->cpuhp_iter_online.component = LTTNG_RING_BUFFER_ITER;
407 ret = cpuhp_state_add_instance(lttng_rb_hp_online,
408 &chan->cpuhp_iter_online.node);
409 if (ret)
410 return ret;
411 #else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
412 {
413 int cpu;
414
415 /*
416 * In case of non-hotplug cpu, if the ring-buffer is allocated
417 * in early initcall, it will not be notified of secondary cpus.
418 * In that off case, we need to allocate for all possible cpus.
419 */
420 #ifdef CONFIG_HOTPLUG_CPU
421 chan->hp_iter_notifier.notifier_call =
422 channel_iterator_cpu_hotplug;
423 chan->hp_iter_notifier.priority = 10;
424 register_cpu_notifier(&chan->hp_iter_notifier);
425
426 lttng_cpus_read_lock();
427 for_each_online_cpu(cpu) {
428 buf = per_cpu_ptr(chan->backend.buf, cpu);
429 lib_ring_buffer_iterator_init(chan, buf);
430 }
431 chan->hp_iter_enable = 1;
432 lttng_cpus_read_unlock();
433 #else
434 for_each_possible_cpu(cpu) {
435 buf = per_cpu_ptr(chan->backend.buf, cpu);
436 lib_ring_buffer_iterator_init(chan, buf);
437 }
438 #endif
439 }
440 #endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
441 } else {
442 buf = channel_get_ring_buffer(config, chan, 0);
443 lib_ring_buffer_iterator_init(chan, buf);
444 }
445 return 0;
446 }
447
448 void channel_iterator_unregister_notifiers(struct channel *chan)
449 {
450 const struct lib_ring_buffer_config *config = &chan->backend.config;
451
452 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
453 #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
454 {
455 int ret;
456
457 ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
458 &chan->cpuhp_iter_online.node);
459 WARN_ON(ret);
460 }
461 #else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
462 chan->hp_iter_enable = 0;
463 unregister_cpu_notifier(&chan->hp_iter_notifier);
464 #endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
465 }
466 }
467
468 void channel_iterator_free(struct channel *chan)
469 {
470 const struct lib_ring_buffer_config *config = &chan->backend.config;
471
472 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
473 lttng_heap_free(&chan->iter.heap);
474 }
475
476 int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
477 {
478 struct channel *chan = buf->backend.chan;
479 const struct lib_ring_buffer_config *config = &chan->backend.config;
480 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
481 return lib_ring_buffer_open_read(buf);
482 }
483 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
484
485 /*
486 * Note: Iterators must not be mixed with other types of outputs, because an
487 * iterator can leave the buffer in "GET" state, which is not consistent with
488 * other types of output (mmap, splice, raw data read).
489 */
490 void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
491 {
492 lib_ring_buffer_release_read(buf);
493 }
494 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
495
496 int channel_iterator_open(struct channel *chan)
497 {
498 const struct lib_ring_buffer_config *config = &chan->backend.config;
499 struct lib_ring_buffer *buf;
500 int ret = 0, cpu;
501
502 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
503
504 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
505 lttng_cpus_read_lock();
506 /* Allow CPU hotplug to keep track of opened reader */
507 chan->iter.read_open = 1;
508 for_each_channel_cpu(cpu, chan) {
509 buf = channel_get_ring_buffer(config, chan, cpu);
510 ret = lib_ring_buffer_iterator_open(buf);
511 if (ret)
512 goto error;
513 buf->iter.read_open = 1;
514 }
515 lttng_cpus_read_unlock();
516 } else {
517 buf = channel_get_ring_buffer(config, chan, 0);
518 ret = lib_ring_buffer_iterator_open(buf);
519 }
520 return ret;
521 error:
522 /* Error should always happen on CPU 0, hence no close is required. */
523 CHAN_WARN_ON(chan, cpu != 0);
524 lttng_cpus_read_unlock();
525 return ret;
526 }
527 EXPORT_SYMBOL_GPL(channel_iterator_open);
528
529 void channel_iterator_release(struct channel *chan)
530 {
531 const struct lib_ring_buffer_config *config = &chan->backend.config;
532 struct lib_ring_buffer *buf;
533 int cpu;
534
535 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
536 lttng_cpus_read_lock();
537 for_each_channel_cpu(cpu, chan) {
538 buf = channel_get_ring_buffer(config, chan, cpu);
539 if (buf->iter.read_open) {
540 lib_ring_buffer_iterator_release(buf);
541 buf->iter.read_open = 0;
542 }
543 }
544 chan->iter.read_open = 0;
545 lttng_cpus_read_unlock();
546 } else {
547 buf = channel_get_ring_buffer(config, chan, 0);
548 lib_ring_buffer_iterator_release(buf);
549 }
550 }
551 EXPORT_SYMBOL_GPL(channel_iterator_release);
552
553 void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
554 {
555 struct channel *chan = buf->backend.chan;
556
557 if (buf->iter.state != ITER_GET_SUBBUF)
558 lib_ring_buffer_put_next_subbuf(buf);
559 buf->iter.state = ITER_GET_SUBBUF;
560 /* Remove from heap (if present). */
561 if (lttng_heap_cherrypick(&chan->iter.heap, buf))
562 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
563 buf->iter.timestamp = 0;
564 buf->iter.header_len = 0;
565 buf->iter.payload_len = 0;
566 buf->iter.consumed = 0;
567 buf->iter.read_offset = 0;
568 buf->iter.data_size = 0;
569 /* Don't reset allocated and read_open */
570 }
571
572 void channel_iterator_reset(struct channel *chan)
573 {
574 const struct lib_ring_buffer_config *config = &chan->backend.config;
575 struct lib_ring_buffer *buf;
576 int cpu;
577
578 /* Empty heap, put into empty_head */
579 while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
580 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
581
582 for_each_channel_cpu(cpu, chan) {
583 buf = channel_get_ring_buffer(config, chan, cpu);
584 lib_ring_buffer_iterator_reset(buf);
585 }
586 /* Don't reset read_open */
587 chan->iter.last_qs = 0;
588 chan->iter.last_timestamp = 0;
589 chan->iter.last_cpu = 0;
590 chan->iter.len_left = 0;
591 }
592
593 /*
594 * Ring buffer payload extraction read() implementation.
595 */
596 static
597 ssize_t channel_ring_buffer_file_read(struct file *filp,
598 char __user *user_buf,
599 size_t count,
600 loff_t *ppos,
601 struct channel *chan,
602 struct lib_ring_buffer *buf,
603 int fusionmerge)
604 {
605 const struct lib_ring_buffer_config *config = &chan->backend.config;
606 size_t read_count = 0, read_offset;
607 ssize_t len;
608
609 might_sleep();
610 if (!lttng_access_ok(VERIFY_WRITE, user_buf, count))
611 return -EFAULT;
612
613 /* Finish copy of previous record */
614 if (*ppos != 0) {
615 if (read_count < count) {
616 len = chan->iter.len_left;
617 read_offset = *ppos;
618 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
619 && fusionmerge)
620 buf = lttng_heap_maximum(&chan->iter.heap);
621 CHAN_WARN_ON(chan, !buf);
622 goto skip_get_next;
623 }
624 }
625
626 while (read_count < count) {
627 size_t copy_len, space_left;
628
629 if (fusionmerge)
630 len = channel_get_next_record(chan, &buf);
631 else
632 len = lib_ring_buffer_get_next_record(chan, buf);
633 len_test:
634 if (len < 0) {
635 /*
636 * Check if buffer is finalized (end of file).
637 */
638 if (len == -ENODATA) {
639 /* A 0 read_count will tell about end of file */
640 goto nodata;
641 }
642 if (filp->f_flags & O_NONBLOCK) {
643 if (!read_count)
644 read_count = -EAGAIN;
645 goto nodata;
646 } else {
647 int error;
648
649 /*
650 * No data available at the moment, return what
651 * we got.
652 */
653 if (read_count)
654 goto nodata;
655
656 /*
657 * Wait for returned len to be >= 0 or -ENODATA.
658 */
659 if (fusionmerge)
660 error = wait_event_interruptible(
661 chan->read_wait,
662 ((len = channel_get_next_record(chan,
663 &buf)), len != -EAGAIN));
664 else
665 error = wait_event_interruptible(
666 buf->read_wait,
667 ((len = lib_ring_buffer_get_next_record(
668 chan, buf)), len != -EAGAIN));
669 CHAN_WARN_ON(chan, len == -EBUSY);
670 if (error) {
671 read_count = error;
672 goto nodata;
673 }
674 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
675 goto len_test;
676 }
677 }
678 read_offset = buf->iter.read_offset;
679 skip_get_next:
680 space_left = count - read_count;
681 if (len <= space_left) {
682 copy_len = len;
683 chan->iter.len_left = 0;
684 *ppos = 0;
685 } else {
686 copy_len = space_left;
687 chan->iter.len_left = len - copy_len;
688 *ppos = read_offset + copy_len;
689 }
690 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
691 &user_buf[read_count],
692 copy_len)) {
693 /*
694 * Leave the len_left and ppos values at their current
695 * state, as we currently have a valid event to read.
696 */
697 return -EFAULT;
698 }
699 read_count += copy_len;
700 };
701 return read_count;
702
703 nodata:
704 *ppos = 0;
705 chan->iter.len_left = 0;
706 return read_count;
707 }
708
709 /**
710 * lib_ring_buffer_file_read - Read buffer record payload.
711 * @filp: file structure pointer.
712 * @buffer: user buffer to read data into.
713 * @count: number of bytes to read.
714 * @ppos: file read position.
715 *
716 * Returns a negative value on error, or the number of bytes read on success.
717 * ppos is used to save the position _within the current record_ between calls
718 * to read().
719 */
720 static
721 ssize_t lib_ring_buffer_file_read(struct file *filp,
722 char __user *user_buf,
723 size_t count,
724 loff_t *ppos)
725 {
726 struct inode *inode = filp->lttng_f_dentry->d_inode;
727 struct lib_ring_buffer *buf = inode->i_private;
728 struct channel *chan = buf->backend.chan;
729
730 return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
731 chan, buf, 0);
732 }
733
734 /**
735 * channel_file_read - Read channel record payload.
736 * @filp: file structure pointer.
737 * @buffer: user buffer to read data into.
738 * @count: number of bytes to read.
739 * @ppos: file read position.
740 *
741 * Returns a negative value on error, or the number of bytes read on success.
742 * ppos is used to save the position _within the current record_ between calls
743 * to read().
744 */
745 static
746 ssize_t channel_file_read(struct file *filp,
747 char __user *user_buf,
748 size_t count,
749 loff_t *ppos)
750 {
751 struct inode *inode = filp->lttng_f_dentry->d_inode;
752 struct channel *chan = inode->i_private;
753 const struct lib_ring_buffer_config *config = &chan->backend.config;
754
755 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
756 return channel_ring_buffer_file_read(filp, user_buf, count,
757 ppos, chan, NULL, 1);
758 else {
759 struct lib_ring_buffer *buf =
760 channel_get_ring_buffer(config, chan, 0);
761 return channel_ring_buffer_file_read(filp, user_buf, count,
762 ppos, chan, buf, 0);
763 }
764 }
765
766 static
767 int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
768 {
769 struct lib_ring_buffer *buf = inode->i_private;
770 int ret;
771
772 ret = lib_ring_buffer_iterator_open(buf);
773 if (ret)
774 return ret;
775
776 file->private_data = buf;
777 ret = nonseekable_open(inode, file);
778 if (ret)
779 goto release_iter;
780 return 0;
781
782 release_iter:
783 lib_ring_buffer_iterator_release(buf);
784 return ret;
785 }
786
787 static
788 int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
789 {
790 struct lib_ring_buffer *buf = inode->i_private;
791
792 lib_ring_buffer_iterator_release(buf);
793 return 0;
794 }
795
796 static
797 int channel_file_open(struct inode *inode, struct file *file)
798 {
799 struct channel *chan = inode->i_private;
800 int ret;
801
802 ret = channel_iterator_open(chan);
803 if (ret)
804 return ret;
805
806 file->private_data = chan;
807 ret = nonseekable_open(inode, file);
808 if (ret)
809 goto release_iter;
810 return 0;
811
812 release_iter:
813 channel_iterator_release(chan);
814 return ret;
815 }
816
817 static
818 int channel_file_release(struct inode *inode, struct file *file)
819 {
820 struct channel *chan = inode->i_private;
821
822 channel_iterator_release(chan);
823 return 0;
824 }
825
826 const struct file_operations channel_payload_file_operations = {
827 .owner = THIS_MODULE,
828 .open = channel_file_open,
829 .release = channel_file_release,
830 .read = channel_file_read,
831 .llseek = vfs_lib_ring_buffer_no_llseek,
832 };
833 EXPORT_SYMBOL_GPL(channel_payload_file_operations);
834
835 const struct file_operations lib_ring_buffer_payload_file_operations = {
836 .owner = THIS_MODULE,
837 .open = lib_ring_buffer_file_open,
838 .release = lib_ring_buffer_file_release,
839 .read = lib_ring_buffer_file_read,
840 .llseek = vfs_lib_ring_buffer_no_llseek,
841 };
842 EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
This page took 0.057502 seconds and 4 git commands to generate.