Fix CPU hotplug section mismatches
[lttng-modules.git] / lib / ringbuffer / ring_buffer_iterator.c
1 /*
2 * ring_buffer_iterator.c
3 *
4 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
5 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
6 * complexity for the "get next event" operation.
7 *
8 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; only
13 * version 2.1 of the License.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 *
24 * Author:
25 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
26 */
27
28 #include "../../wrapper/ringbuffer/iterator.h"
29 #include <linux/jiffies.h>
30 #include <linux/delay.h>
31 #include <linux/module.h>
32
33 /*
34 * Safety factor taking into account internal kernel interrupt latency.
35 * Assuming 250ms worse-case latency.
36 */
37 #define MAX_SYSTEM_LATENCY 250
38
39 /*
40 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
41 */
42 #define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
43
44 /**
45 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
46 * @chan: channel
47 * @buf: buffer
48 *
49 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
50 * buffer is empty and finalized. The buffer must already be opened for reading.
51 */
52 ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
53 struct lib_ring_buffer *buf)
54 {
55 const struct lib_ring_buffer_config *config = &chan->backend.config;
56 struct lib_ring_buffer_iter *iter = &buf->iter;
57 int ret;
58
59 restart:
60 switch (iter->state) {
61 case ITER_GET_SUBBUF:
62 ret = lib_ring_buffer_get_next_subbuf(buf);
63 if (ret && !ACCESS_ONCE(buf->finalized)
64 && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
65 /*
66 * Use "pull" scheme for global buffers. The reader
67 * itself flushes the buffer to "pull" data not visible
68 * to readers yet. Flush current subbuffer and re-try.
69 *
70 * Per-CPU buffers rather use a "push" scheme because
71 * the IPI needed to flush all CPU's buffers is too
72 * costly. In the "push" scheme, the reader waits for
73 * the writer periodic deferrable timer to flush the
74 * buffers (keeping track of a quiescent state
75 * timestamp). Therefore, the writer "pushes" data out
76 * of the buffers rather than letting the reader "pull"
77 * data from the buffer.
78 */
79 lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
80 ret = lib_ring_buffer_get_next_subbuf(buf);
81 }
82 if (ret)
83 return ret;
84 iter->consumed = buf->cons_snapshot;
85 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
86 iter->read_offset = iter->consumed;
87 /* skip header */
88 iter->read_offset += config->cb.subbuffer_header_size();
89 iter->state = ITER_TEST_RECORD;
90 goto restart;
91 case ITER_TEST_RECORD:
92 if (iter->read_offset - iter->consumed >= iter->data_size) {
93 iter->state = ITER_PUT_SUBBUF;
94 } else {
95 CHAN_WARN_ON(chan, !config->cb.record_get);
96 config->cb.record_get(config, chan, buf,
97 iter->read_offset,
98 &iter->header_len,
99 &iter->payload_len,
100 &iter->timestamp);
101 iter->read_offset += iter->header_len;
102 subbuffer_consume_record(config, &buf->backend);
103 iter->state = ITER_NEXT_RECORD;
104 return iter->payload_len;
105 }
106 goto restart;
107 case ITER_NEXT_RECORD:
108 iter->read_offset += iter->payload_len;
109 iter->state = ITER_TEST_RECORD;
110 goto restart;
111 case ITER_PUT_SUBBUF:
112 lib_ring_buffer_put_next_subbuf(buf);
113 iter->state = ITER_GET_SUBBUF;
114 goto restart;
115 default:
116 CHAN_WARN_ON(chan, 1); /* Should not happen */
117 return -EPERM;
118 }
119 }
120 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
121
122 static int buf_is_higher(void *a, void *b)
123 {
124 struct lib_ring_buffer *bufa = a;
125 struct lib_ring_buffer *bufb = b;
126
127 /* Consider lowest timestamps to be at the top of the heap */
128 return (bufa->iter.timestamp < bufb->iter.timestamp);
129 }
130
131 static
132 void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
133 struct channel *chan)
134 {
135 struct lttng_ptr_heap *heap = &chan->iter.heap;
136 struct lib_ring_buffer *buf, *tmp;
137 ssize_t len;
138
139 list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
140 iter.empty_node) {
141 len = lib_ring_buffer_get_next_record(chan, buf);
142
143 /*
144 * Deal with -EAGAIN and -ENODATA.
145 * len >= 0 means record contains data.
146 * -EBUSY should never happen, because we support only one
147 * reader.
148 */
149 switch (len) {
150 case -EAGAIN:
151 /* Keep node in empty list */
152 break;
153 case -ENODATA:
154 /*
155 * Buffer is finalized. Don't add to list of empty
156 * buffer, because it has no more data to provide, ever.
157 */
158 list_del(&buf->iter.empty_node);
159 break;
160 case -EBUSY:
161 CHAN_WARN_ON(chan, 1);
162 break;
163 default:
164 /*
165 * Insert buffer into the heap, remove from empty buffer
166 * list.
167 */
168 CHAN_WARN_ON(chan, len < 0);
169 list_del(&buf->iter.empty_node);
170 CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
171 }
172 }
173 }
174
175 static
176 void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
177 struct channel *chan)
178 {
179 u64 timestamp_qs;
180 unsigned long wait_msecs;
181
182 /*
183 * No need to wait if no empty buffers are present.
184 */
185 if (list_empty(&chan->iter.empty_head))
186 return;
187
188 timestamp_qs = config->cb.ring_buffer_clock_read(chan);
189 /*
190 * We need to consider previously empty buffers.
191 * Do a get next buf record on each of them. Add them to
192 * the heap if they have data. If at least one of them
193 * don't have data, we need to wait for
194 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
195 * buffers have been switched either by the timer or idle entry) and
196 * check them again, adding them if they have data.
197 */
198 lib_ring_buffer_get_empty_buf_records(config, chan);
199
200 /*
201 * No need to wait if no empty buffers are present.
202 */
203 if (list_empty(&chan->iter.empty_head))
204 return;
205
206 /*
207 * We need to wait for the buffer switch timer to run. If the
208 * CPU is idle, idle entry performed the switch.
209 * TODO: we could optimize further by skipping the sleep if all
210 * empty buffers belong to idle or offline cpus.
211 */
212 wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
213 wait_msecs += MAX_SYSTEM_LATENCY;
214 msleep(wait_msecs);
215 lib_ring_buffer_get_empty_buf_records(config, chan);
216 /*
217 * Any buffer still in the empty list here cannot possibly
218 * contain an event with a timestamp prior to "timestamp_qs".
219 * The new quiescent state timestamp is the one we grabbed
220 * before waiting for buffer data. It is therefore safe to
221 * ignore empty buffers up to last_qs timestamp for fusion
222 * merge.
223 */
224 chan->iter.last_qs = timestamp_qs;
225 }
226
227 /**
228 * channel_get_next_record - Get the next record in a channel.
229 * @chan: channel
230 * @ret_buf: the buffer in which the event is located (output)
231 *
232 * Returns the size of new current event, -EAGAIN if all buffers are empty,
233 * -ENODATA if all buffers are empty and finalized. The channel must already be
234 * opened for reading.
235 */
236
237 ssize_t channel_get_next_record(struct channel *chan,
238 struct lib_ring_buffer **ret_buf)
239 {
240 const struct lib_ring_buffer_config *config = &chan->backend.config;
241 struct lib_ring_buffer *buf;
242 struct lttng_ptr_heap *heap;
243 ssize_t len;
244
245 if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
246 *ret_buf = channel_get_ring_buffer(config, chan, 0);
247 return lib_ring_buffer_get_next_record(chan, *ret_buf);
248 }
249
250 heap = &chan->iter.heap;
251
252 /*
253 * get next record for topmost buffer.
254 */
255 buf = lttng_heap_maximum(heap);
256 if (buf) {
257 len = lib_ring_buffer_get_next_record(chan, buf);
258 /*
259 * Deal with -EAGAIN and -ENODATA.
260 * len >= 0 means record contains data.
261 */
262 switch (len) {
263 case -EAGAIN:
264 buf->iter.timestamp = 0;
265 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
266 /* Remove topmost buffer from the heap */
267 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
268 break;
269 case -ENODATA:
270 /*
271 * Buffer is finalized. Remove buffer from heap and
272 * don't add to list of empty buffer, because it has no
273 * more data to provide, ever.
274 */
275 CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
276 break;
277 case -EBUSY:
278 CHAN_WARN_ON(chan, 1);
279 break;
280 default:
281 /*
282 * Reinsert buffer into the heap. Note that heap can be
283 * partially empty, so we need to use
284 * lttng_heap_replace_max().
285 */
286 CHAN_WARN_ON(chan, len < 0);
287 CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
288 break;
289 }
290 }
291
292 buf = lttng_heap_maximum(heap);
293 if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
294 /*
295 * Deal with buffers previously showing no data.
296 * Add buffers containing data to the heap, update
297 * last_qs.
298 */
299 lib_ring_buffer_wait_for_qs(config, chan);
300 }
301
302 *ret_buf = buf = lttng_heap_maximum(heap);
303 if (buf) {
304 /*
305 * If this warning triggers, you probably need to check your
306 * system interrupt latency. Typical causes: too many printk()
307 * output going to a serial console with interrupts off.
308 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
309 * Observed on SMP KVM setups with trace_clock().
310 */
311 if (chan->iter.last_timestamp
312 > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
313 printk(KERN_WARNING "ring_buffer: timestamps going "
314 "backward. Last time %llu ns, cpu %d, "
315 "current time %llu ns, cpu %d, "
316 "delta %llu ns.\n",
317 chan->iter.last_timestamp, chan->iter.last_cpu,
318 buf->iter.timestamp, buf->backend.cpu,
319 chan->iter.last_timestamp - buf->iter.timestamp);
320 CHAN_WARN_ON(chan, 1);
321 }
322 chan->iter.last_timestamp = buf->iter.timestamp;
323 chan->iter.last_cpu = buf->backend.cpu;
324 return buf->iter.payload_len;
325 } else {
326 /* Heap is empty */
327 if (list_empty(&chan->iter.empty_head))
328 return -ENODATA; /* All buffers finalized */
329 else
330 return -EAGAIN; /* Temporarily empty */
331 }
332 }
333 EXPORT_SYMBOL_GPL(channel_get_next_record);
334
335 static
336 void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
337 {
338 if (buf->iter.allocated)
339 return;
340
341 buf->iter.allocated = 1;
342 if (chan->iter.read_open && !buf->iter.read_open) {
343 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
344 buf->iter.read_open = 1;
345 }
346
347 /* Add to list of buffers without any current record */
348 if (chan->backend.config.alloc == RING_BUFFER_ALLOC_PER_CPU)
349 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
350 }
351
352 #ifdef CONFIG_HOTPLUG_CPU
353 static
354 int channel_iterator_cpu_hotplug(struct notifier_block *nb,
355 unsigned long action,
356 void *hcpu)
357 {
358 unsigned int cpu = (unsigned long)hcpu;
359 struct channel *chan = container_of(nb, struct channel,
360 hp_iter_notifier);
361 struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
362 const struct lib_ring_buffer_config *config = &chan->backend.config;
363
364 if (!chan->hp_iter_enable)
365 return NOTIFY_DONE;
366
367 CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
368
369 switch (action) {
370 case CPU_DOWN_FAILED:
371 case CPU_DOWN_FAILED_FROZEN:
372 case CPU_ONLINE:
373 case CPU_ONLINE_FROZEN:
374 lib_ring_buffer_iterator_init(chan, buf);
375 return NOTIFY_OK;
376 default:
377 return NOTIFY_DONE;
378 }
379 }
380 #endif
381
382 int channel_iterator_init(struct channel *chan)
383 {
384 const struct lib_ring_buffer_config *config = &chan->backend.config;
385 struct lib_ring_buffer *buf;
386
387 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
388 int cpu, ret;
389
390 INIT_LIST_HEAD(&chan->iter.empty_head);
391 ret = lttng_heap_init(&chan->iter.heap,
392 num_possible_cpus(),
393 GFP_KERNEL, buf_is_higher);
394 if (ret)
395 return ret;
396 /*
397 * In case of non-hotplug cpu, if the ring-buffer is allocated
398 * in early initcall, it will not be notified of secondary cpus.
399 * In that off case, we need to allocate for all possible cpus.
400 */
401 #ifdef CONFIG_HOTPLUG_CPU
402 chan->hp_iter_notifier.notifier_call =
403 channel_iterator_cpu_hotplug;
404 chan->hp_iter_notifier.priority = 10;
405 register_cpu_notifier(&chan->hp_iter_notifier);
406 get_online_cpus();
407 for_each_online_cpu(cpu) {
408 buf = per_cpu_ptr(chan->backend.buf, cpu);
409 lib_ring_buffer_iterator_init(chan, buf);
410 }
411 chan->hp_iter_enable = 1;
412 put_online_cpus();
413 #else
414 for_each_possible_cpu(cpu) {
415 buf = per_cpu_ptr(chan->backend.buf, cpu);
416 lib_ring_buffer_iterator_init(chan, buf);
417 }
418 #endif
419 } else {
420 buf = channel_get_ring_buffer(config, chan, 0);
421 lib_ring_buffer_iterator_init(chan, buf);
422 }
423 return 0;
424 }
425
426 void channel_iterator_unregister_notifiers(struct channel *chan)
427 {
428 const struct lib_ring_buffer_config *config = &chan->backend.config;
429
430 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
431 chan->hp_iter_enable = 0;
432 unregister_cpu_notifier(&chan->hp_iter_notifier);
433 }
434 }
435
436 void channel_iterator_free(struct channel *chan)
437 {
438 const struct lib_ring_buffer_config *config = &chan->backend.config;
439
440 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
441 lttng_heap_free(&chan->iter.heap);
442 }
443
444 int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
445 {
446 struct channel *chan = buf->backend.chan;
447 const struct lib_ring_buffer_config *config = &chan->backend.config;
448 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
449 return lib_ring_buffer_open_read(buf);
450 }
451 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
452
453 /*
454 * Note: Iterators must not be mixed with other types of outputs, because an
455 * iterator can leave the buffer in "GET" state, which is not consistent with
456 * other types of output (mmap, splice, raw data read).
457 */
458 void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
459 {
460 lib_ring_buffer_release_read(buf);
461 }
462 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
463
464 int channel_iterator_open(struct channel *chan)
465 {
466 const struct lib_ring_buffer_config *config = &chan->backend.config;
467 struct lib_ring_buffer *buf;
468 int ret = 0, cpu;
469
470 CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
471
472 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
473 get_online_cpus();
474 /* Allow CPU hotplug to keep track of opened reader */
475 chan->iter.read_open = 1;
476 for_each_channel_cpu(cpu, chan) {
477 buf = channel_get_ring_buffer(config, chan, cpu);
478 ret = lib_ring_buffer_iterator_open(buf);
479 if (ret)
480 goto error;
481 buf->iter.read_open = 1;
482 }
483 put_online_cpus();
484 } else {
485 buf = channel_get_ring_buffer(config, chan, 0);
486 ret = lib_ring_buffer_iterator_open(buf);
487 }
488 return ret;
489 error:
490 /* Error should always happen on CPU 0, hence no close is required. */
491 CHAN_WARN_ON(chan, cpu != 0);
492 put_online_cpus();
493 return ret;
494 }
495 EXPORT_SYMBOL_GPL(channel_iterator_open);
496
497 void channel_iterator_release(struct channel *chan)
498 {
499 const struct lib_ring_buffer_config *config = &chan->backend.config;
500 struct lib_ring_buffer *buf;
501 int cpu;
502
503 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
504 get_online_cpus();
505 for_each_channel_cpu(cpu, chan) {
506 buf = channel_get_ring_buffer(config, chan, cpu);
507 if (buf->iter.read_open) {
508 lib_ring_buffer_iterator_release(buf);
509 buf->iter.read_open = 0;
510 }
511 }
512 chan->iter.read_open = 0;
513 put_online_cpus();
514 } else {
515 buf = channel_get_ring_buffer(config, chan, 0);
516 lib_ring_buffer_iterator_release(buf);
517 }
518 }
519 EXPORT_SYMBOL_GPL(channel_iterator_release);
520
521 void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
522 {
523 struct channel *chan = buf->backend.chan;
524
525 if (buf->iter.state != ITER_GET_SUBBUF)
526 lib_ring_buffer_put_next_subbuf(buf);
527 buf->iter.state = ITER_GET_SUBBUF;
528 /* Remove from heap (if present). */
529 if (lttng_heap_cherrypick(&chan->iter.heap, buf))
530 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
531 buf->iter.timestamp = 0;
532 buf->iter.header_len = 0;
533 buf->iter.payload_len = 0;
534 buf->iter.consumed = 0;
535 buf->iter.read_offset = 0;
536 buf->iter.data_size = 0;
537 /* Don't reset allocated and read_open */
538 }
539
540 void channel_iterator_reset(struct channel *chan)
541 {
542 const struct lib_ring_buffer_config *config = &chan->backend.config;
543 struct lib_ring_buffer *buf;
544 int cpu;
545
546 /* Empty heap, put into empty_head */
547 while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
548 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
549
550 for_each_channel_cpu(cpu, chan) {
551 buf = channel_get_ring_buffer(config, chan, cpu);
552 lib_ring_buffer_iterator_reset(buf);
553 }
554 /* Don't reset read_open */
555 chan->iter.last_qs = 0;
556 chan->iter.last_timestamp = 0;
557 chan->iter.last_cpu = 0;
558 chan->iter.len_left = 0;
559 }
560
561 /*
562 * Ring buffer payload extraction read() implementation.
563 */
564 static
565 ssize_t channel_ring_buffer_file_read(struct file *filp,
566 char __user *user_buf,
567 size_t count,
568 loff_t *ppos,
569 struct channel *chan,
570 struct lib_ring_buffer *buf,
571 int fusionmerge)
572 {
573 const struct lib_ring_buffer_config *config = &chan->backend.config;
574 size_t read_count = 0, read_offset;
575 ssize_t len;
576
577 might_sleep();
578 if (!access_ok(VERIFY_WRITE, user_buf, count))
579 return -EFAULT;
580
581 /* Finish copy of previous record */
582 if (*ppos != 0) {
583 if (read_count < count) {
584 len = chan->iter.len_left;
585 read_offset = *ppos;
586 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
587 && fusionmerge)
588 buf = lttng_heap_maximum(&chan->iter.heap);
589 CHAN_WARN_ON(chan, !buf);
590 goto skip_get_next;
591 }
592 }
593
594 while (read_count < count) {
595 size_t copy_len, space_left;
596
597 if (fusionmerge)
598 len = channel_get_next_record(chan, &buf);
599 else
600 len = lib_ring_buffer_get_next_record(chan, buf);
601 len_test:
602 if (len < 0) {
603 /*
604 * Check if buffer is finalized (end of file).
605 */
606 if (len == -ENODATA) {
607 /* A 0 read_count will tell about end of file */
608 goto nodata;
609 }
610 if (filp->f_flags & O_NONBLOCK) {
611 if (!read_count)
612 read_count = -EAGAIN;
613 goto nodata;
614 } else {
615 int error;
616
617 /*
618 * No data available at the moment, return what
619 * we got.
620 */
621 if (read_count)
622 goto nodata;
623
624 /*
625 * Wait for returned len to be >= 0 or -ENODATA.
626 */
627 if (fusionmerge)
628 error = wait_event_interruptible(
629 chan->read_wait,
630 ((len = channel_get_next_record(chan,
631 &buf)), len != -EAGAIN));
632 else
633 error = wait_event_interruptible(
634 buf->read_wait,
635 ((len = lib_ring_buffer_get_next_record(
636 chan, buf)), len != -EAGAIN));
637 CHAN_WARN_ON(chan, len == -EBUSY);
638 if (error) {
639 read_count = error;
640 goto nodata;
641 }
642 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
643 goto len_test;
644 }
645 }
646 read_offset = buf->iter.read_offset;
647 skip_get_next:
648 space_left = count - read_count;
649 if (len <= space_left) {
650 copy_len = len;
651 chan->iter.len_left = 0;
652 *ppos = 0;
653 } else {
654 copy_len = space_left;
655 chan->iter.len_left = len - copy_len;
656 *ppos = read_offset + copy_len;
657 }
658 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
659 &user_buf[read_count],
660 copy_len)) {
661 /*
662 * Leave the len_left and ppos values at their current
663 * state, as we currently have a valid event to read.
664 */
665 return -EFAULT;
666 }
667 read_count += copy_len;
668 };
669 return read_count;
670
671 nodata:
672 *ppos = 0;
673 chan->iter.len_left = 0;
674 return read_count;
675 }
676
677 /**
678 * lib_ring_buffer_file_read - Read buffer record payload.
679 * @filp: file structure pointer.
680 * @buffer: user buffer to read data into.
681 * @count: number of bytes to read.
682 * @ppos: file read position.
683 *
684 * Returns a negative value on error, or the number of bytes read on success.
685 * ppos is used to save the position _within the current record_ between calls
686 * to read().
687 */
688 static
689 ssize_t lib_ring_buffer_file_read(struct file *filp,
690 char __user *user_buf,
691 size_t count,
692 loff_t *ppos)
693 {
694 struct inode *inode = filp->f_dentry->d_inode;
695 struct lib_ring_buffer *buf = inode->i_private;
696 struct channel *chan = buf->backend.chan;
697
698 return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
699 chan, buf, 0);
700 }
701
702 /**
703 * channel_file_read - Read channel record payload.
704 * @filp: file structure pointer.
705 * @buffer: user buffer to read data into.
706 * @count: number of bytes to read.
707 * @ppos: file read position.
708 *
709 * Returns a negative value on error, or the number of bytes read on success.
710 * ppos is used to save the position _within the current record_ between calls
711 * to read().
712 */
713 static
714 ssize_t channel_file_read(struct file *filp,
715 char __user *user_buf,
716 size_t count,
717 loff_t *ppos)
718 {
719 struct inode *inode = filp->f_dentry->d_inode;
720 struct channel *chan = inode->i_private;
721 const struct lib_ring_buffer_config *config = &chan->backend.config;
722
723 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
724 return channel_ring_buffer_file_read(filp, user_buf, count,
725 ppos, chan, NULL, 1);
726 else {
727 struct lib_ring_buffer *buf =
728 channel_get_ring_buffer(config, chan, 0);
729 return channel_ring_buffer_file_read(filp, user_buf, count,
730 ppos, chan, buf, 0);
731 }
732 }
733
734 static
735 int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
736 {
737 struct lib_ring_buffer *buf = inode->i_private;
738 int ret;
739
740 ret = lib_ring_buffer_iterator_open(buf);
741 if (ret)
742 return ret;
743
744 file->private_data = buf;
745 ret = nonseekable_open(inode, file);
746 if (ret)
747 goto release_iter;
748 return 0;
749
750 release_iter:
751 lib_ring_buffer_iterator_release(buf);
752 return ret;
753 }
754
755 static
756 int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
757 {
758 struct lib_ring_buffer *buf = inode->i_private;
759
760 lib_ring_buffer_iterator_release(buf);
761 return 0;
762 }
763
764 static
765 int channel_file_open(struct inode *inode, struct file *file)
766 {
767 struct channel *chan = inode->i_private;
768 int ret;
769
770 ret = channel_iterator_open(chan);
771 if (ret)
772 return ret;
773
774 file->private_data = chan;
775 ret = nonseekable_open(inode, file);
776 if (ret)
777 goto release_iter;
778 return 0;
779
780 release_iter:
781 channel_iterator_release(chan);
782 return ret;
783 }
784
785 static
786 int channel_file_release(struct inode *inode, struct file *file)
787 {
788 struct channel *chan = inode->i_private;
789
790 channel_iterator_release(chan);
791 return 0;
792 }
793
794 const struct file_operations channel_payload_file_operations = {
795 .owner = THIS_MODULE,
796 .open = channel_file_open,
797 .release = channel_file_release,
798 .read = channel_file_read,
799 .llseek = lib_ring_buffer_no_llseek,
800 };
801 EXPORT_SYMBOL_GPL(channel_payload_file_operations);
802
803 const struct file_operations lib_ring_buffer_payload_file_operations = {
804 .owner = THIS_MODULE,
805 .open = lib_ring_buffer_file_open,
806 .release = lib_ring_buffer_file_release,
807 .read = lib_ring_buffer_file_read,
808 .llseek = lib_ring_buffer_no_llseek,
809 };
810 EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
This page took 0.047338 seconds and 5 git commands to generate.