Return error if subbuffer size is lower than a page
[lttng-modules.git] / lib / ringbuffer / ring_buffer_backend.c
1 /*
2 * ring_buffer_backend.c
3 *
4 * Copyright (C) 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Dual LGPL v2.1/GPL v2 license.
7 */
8
9 #include <linux/stddef.h>
10 #include <linux/module.h>
11 #include <linux/string.h>
12 #include <linux/bitops.h>
13 #include <linux/delay.h>
14 #include <linux/errno.h>
15 #include <linux/slab.h>
16 #include <linux/cpu.h>
17 #include <linux/mm.h>
18
19 #include "../../wrapper/vmalloc.h" /* for wrapper_vmalloc_sync_all() */
20 #include "../../wrapper/ringbuffer/config.h"
21 #include "../../wrapper/ringbuffer/backend.h"
22 #include "../../wrapper/ringbuffer/frontend.h"
23
24 /**
25 * lib_ring_buffer_backend_allocate - allocate a channel buffer
26 * @config: ring buffer instance configuration
27 * @buf: the buffer struct
28 * @size: total size of the buffer
29 * @num_subbuf: number of subbuffers
30 * @extra_reader_sb: need extra subbuffer for reader
31 */
32 static
33 int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config,
34 struct lib_ring_buffer_backend *bufb,
35 size_t size, size_t num_subbuf,
36 int extra_reader_sb)
37 {
38 struct channel_backend *chanb = &bufb->chan->backend;
39 unsigned long j, num_pages, num_pages_per_subbuf, page_idx = 0;
40 unsigned long subbuf_size, mmap_offset = 0;
41 unsigned long num_subbuf_alloc;
42 struct page **pages;
43 void **virt;
44 unsigned long i;
45
46 num_pages = size >> PAGE_SHIFT;
47 num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf);
48 subbuf_size = chanb->subbuf_size;
49 num_subbuf_alloc = num_subbuf;
50
51 if (extra_reader_sb) {
52 num_pages += num_pages_per_subbuf; /* Add pages for reader */
53 num_subbuf_alloc++;
54 }
55
56 pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages,
57 1 << INTERNODE_CACHE_SHIFT),
58 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
59 if (unlikely(!pages))
60 goto pages_error;
61
62 virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages,
63 1 << INTERNODE_CACHE_SHIFT),
64 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
65 if (unlikely(!virt))
66 goto virt_error;
67
68 bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array)
69 * num_subbuf_alloc,
70 1 << INTERNODE_CACHE_SHIFT),
71 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
72 if (unlikely(!bufb->array))
73 goto array_error;
74
75 for (i = 0; i < num_pages; i++) {
76 pages[i] = alloc_pages_node(cpu_to_node(max(bufb->cpu, 0)),
77 GFP_KERNEL | __GFP_ZERO, 0);
78 if (unlikely(!pages[i]))
79 goto depopulate;
80 virt[i] = page_address(pages[i]);
81 }
82 bufb->num_pages_per_subbuf = num_pages_per_subbuf;
83
84 /* Allocate backend pages array elements */
85 for (i = 0; i < num_subbuf_alloc; i++) {
86 bufb->array[i] =
87 kzalloc_node(ALIGN(
88 sizeof(struct lib_ring_buffer_backend_pages) +
89 sizeof(struct lib_ring_buffer_backend_page)
90 * num_pages_per_subbuf,
91 1 << INTERNODE_CACHE_SHIFT),
92 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
93 if (!bufb->array[i])
94 goto free_array;
95 }
96
97 /* Allocate write-side subbuffer table */
98 bufb->buf_wsb = kzalloc_node(ALIGN(
99 sizeof(struct lib_ring_buffer_backend_subbuffer)
100 * num_subbuf,
101 1 << INTERNODE_CACHE_SHIFT),
102 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
103 if (unlikely(!bufb->buf_wsb))
104 goto free_array;
105
106 for (i = 0; i < num_subbuf; i++)
107 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
108
109 /* Assign read-side subbuffer table */
110 if (extra_reader_sb)
111 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
112 num_subbuf_alloc - 1);
113 else
114 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
115
116 /* Assign pages to page index */
117 for (i = 0; i < num_subbuf_alloc; i++) {
118 for (j = 0; j < num_pages_per_subbuf; j++) {
119 CHAN_WARN_ON(chanb, page_idx > num_pages);
120 bufb->array[i]->p[j].virt = virt[page_idx];
121 bufb->array[i]->p[j].page = pages[page_idx];
122 page_idx++;
123 }
124 if (config->output == RING_BUFFER_MMAP) {
125 bufb->array[i]->mmap_offset = mmap_offset;
126 mmap_offset += subbuf_size;
127 }
128 }
129
130 /*
131 * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
132 * will not fault.
133 */
134 wrapper_vmalloc_sync_all();
135 kfree(virt);
136 kfree(pages);
137 return 0;
138
139 free_array:
140 for (i = 0; (i < num_subbuf_alloc && bufb->array[i]); i++)
141 kfree(bufb->array[i]);
142 depopulate:
143 /* Free all allocated pages */
144 for (i = 0; (i < num_pages && pages[i]); i++)
145 __free_page(pages[i]);
146 kfree(bufb->array);
147 array_error:
148 kfree(virt);
149 virt_error:
150 kfree(pages);
151 pages_error:
152 return -ENOMEM;
153 }
154
155 int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
156 struct channel_backend *chanb, int cpu)
157 {
158 const struct lib_ring_buffer_config *config = chanb->config;
159
160 bufb->chan = container_of(chanb, struct channel, backend);
161 bufb->cpu = cpu;
162
163 return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
164 chanb->num_subbuf,
165 chanb->extra_reader_sb);
166 }
167
168 void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
169 {
170 struct channel_backend *chanb = &bufb->chan->backend;
171 unsigned long i, j, num_subbuf_alloc;
172
173 num_subbuf_alloc = chanb->num_subbuf;
174 if (chanb->extra_reader_sb)
175 num_subbuf_alloc++;
176
177 kfree(bufb->buf_wsb);
178 for (i = 0; i < num_subbuf_alloc; i++) {
179 for (j = 0; j < bufb->num_pages_per_subbuf; j++)
180 __free_page(bufb->array[i]->p[j].page);
181 kfree(bufb->array[i]);
182 }
183 kfree(bufb->array);
184 bufb->allocated = 0;
185 }
186
187 void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
188 {
189 struct channel_backend *chanb = &bufb->chan->backend;
190 const struct lib_ring_buffer_config *config = chanb->config;
191 unsigned long num_subbuf_alloc;
192 unsigned int i;
193
194 num_subbuf_alloc = chanb->num_subbuf;
195 if (chanb->extra_reader_sb)
196 num_subbuf_alloc++;
197
198 for (i = 0; i < chanb->num_subbuf; i++)
199 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
200 if (chanb->extra_reader_sb)
201 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
202 num_subbuf_alloc - 1);
203 else
204 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
205
206 for (i = 0; i < num_subbuf_alloc; i++) {
207 /* Don't reset mmap_offset */
208 v_set(config, &bufb->array[i]->records_commit, 0);
209 v_set(config, &bufb->array[i]->records_unread, 0);
210 bufb->array[i]->data_size = 0;
211 /* Don't reset backend page and virt addresses */
212 }
213 /* Don't reset num_pages_per_subbuf, cpu, allocated */
214 v_set(config, &bufb->records_read, 0);
215 }
216
217 /*
218 * The frontend is responsible for also calling ring_buffer_backend_reset for
219 * each buffer when calling channel_backend_reset.
220 */
221 void channel_backend_reset(struct channel_backend *chanb)
222 {
223 struct channel *chan = container_of(chanb, struct channel, backend);
224 const struct lib_ring_buffer_config *config = chanb->config;
225
226 /*
227 * Don't reset buf_size, subbuf_size, subbuf_size_order,
228 * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
229 * priv, notifiers, config, cpumask and name.
230 */
231 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
232 }
233
234 #ifdef CONFIG_HOTPLUG_CPU
235 /**
236 * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
237 * @nb: notifier block
238 * @action: hotplug action to take
239 * @hcpu: CPU number
240 *
241 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
242 */
243 static
244 int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
245 unsigned long action,
246 void *hcpu)
247 {
248 unsigned int cpu = (unsigned long)hcpu;
249 struct channel_backend *chanb = container_of(nb, struct channel_backend,
250 cpu_hp_notifier);
251 const struct lib_ring_buffer_config *config = chanb->config;
252 struct lib_ring_buffer *buf;
253 int ret;
254
255 CHAN_WARN_ON(chanb, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
256
257 switch (action) {
258 case CPU_UP_PREPARE:
259 case CPU_UP_PREPARE_FROZEN:
260 buf = per_cpu_ptr(chanb->buf, cpu);
261 ret = lib_ring_buffer_create(buf, chanb, cpu);
262 if (ret) {
263 printk(KERN_ERR
264 "ring_buffer_cpu_hp_callback: cpu %d "
265 "buffer creation failed\n", cpu);
266 return NOTIFY_BAD;
267 }
268 break;
269 case CPU_DEAD:
270 case CPU_DEAD_FROZEN:
271 /* No need to do a buffer switch here, because it will happen
272 * when tracing is stopped, or will be done by switch timer CPU
273 * DEAD callback. */
274 break;
275 }
276 return NOTIFY_OK;
277 }
278 #endif
279
280 /**
281 * channel_backend_init - initialize a channel backend
282 * @chanb: channel backend
283 * @name: channel name
284 * @config: client ring buffer configuration
285 * @priv: client private data
286 * @parent: dentry of parent directory, %NULL for root directory
287 * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
288 * @num_subbuf: number of sub-buffers (power of 2)
289 *
290 * Returns channel pointer if successful, %NULL otherwise.
291 *
292 * Creates per-cpu channel buffers using the sizes and attributes
293 * specified. The created channel buffer files will be named
294 * name_0...name_N-1. File permissions will be %S_IRUSR.
295 *
296 * Called with CPU hotplug disabled.
297 */
298 int channel_backend_init(struct channel_backend *chanb,
299 const char *name,
300 const struct lib_ring_buffer_config *config,
301 void *priv, size_t subbuf_size, size_t num_subbuf)
302 {
303 struct channel *chan = container_of(chanb, struct channel, backend);
304 unsigned int i;
305 int ret;
306
307 if (!name)
308 return -EPERM;
309
310 if (!(subbuf_size && num_subbuf))
311 return -EPERM;
312
313 /* Check that the subbuffer size is larger than a page. */
314 if (subbuf_size < PAGE_SIZE)
315 return -EINVAL;
316
317 /*
318 * Make sure the number of subbuffers and subbuffer size are power of 2.
319 */
320 CHAN_WARN_ON(chanb, hweight32(subbuf_size) != 1);
321 CHAN_WARN_ON(chanb, hweight32(num_subbuf) != 1);
322
323 ret = subbuffer_id_check_index(config, num_subbuf);
324 if (ret)
325 return ret;
326
327 chanb->priv = priv;
328 chanb->buf_size = num_subbuf * subbuf_size;
329 chanb->subbuf_size = subbuf_size;
330 chanb->buf_size_order = get_count_order(chanb->buf_size);
331 chanb->subbuf_size_order = get_count_order(subbuf_size);
332 chanb->num_subbuf_order = get_count_order(num_subbuf);
333 chanb->extra_reader_sb =
334 (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0;
335 chanb->num_subbuf = num_subbuf;
336 strlcpy(chanb->name, name, NAME_MAX);
337 chanb->config = config;
338
339 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
340 if (!zalloc_cpumask_var(&chanb->cpumask, GFP_KERNEL))
341 return -ENOMEM;
342 }
343
344 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
345 /* Allocating the buffer per-cpu structures */
346 chanb->buf = alloc_percpu(struct lib_ring_buffer);
347 if (!chanb->buf)
348 goto free_cpumask;
349
350 /*
351 * In case of non-hotplug cpu, if the ring-buffer is allocated
352 * in early initcall, it will not be notified of secondary cpus.
353 * In that off case, we need to allocate for all possible cpus.
354 */
355 #ifdef CONFIG_HOTPLUG_CPU
356 /*
357 * buf->backend.allocated test takes care of concurrent CPU
358 * hotplug.
359 * Priority higher than frontend, so we create the ring buffer
360 * before we start the timer.
361 */
362 chanb->cpu_hp_notifier.notifier_call =
363 lib_ring_buffer_cpu_hp_callback;
364 chanb->cpu_hp_notifier.priority = 5;
365 register_hotcpu_notifier(&chanb->cpu_hp_notifier);
366
367 get_online_cpus();
368 for_each_online_cpu(i) {
369 ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
370 chanb, i);
371 if (ret)
372 goto free_bufs; /* cpu hotplug locked */
373 }
374 put_online_cpus();
375 #else
376 for_each_possible_cpu(i) {
377 ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
378 chanb, i);
379 if (ret)
380 goto free_bufs; /* cpu hotplug locked */
381 }
382 #endif
383 } else {
384 chanb->buf = kzalloc(sizeof(struct lib_ring_buffer), GFP_KERNEL);
385 if (!chanb->buf)
386 goto free_cpumask;
387 ret = lib_ring_buffer_create(chanb->buf, chanb, -1);
388 if (ret)
389 goto free_bufs;
390 }
391 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
392
393 return 0;
394
395 free_bufs:
396 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
397 for_each_possible_cpu(i) {
398 struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
399
400 if (!buf->backend.allocated)
401 continue;
402 lib_ring_buffer_free(buf);
403 }
404 #ifdef CONFIG_HOTPLUG_CPU
405 put_online_cpus();
406 #endif
407 free_percpu(chanb->buf);
408 } else
409 kfree(chanb->buf);
410 free_cpumask:
411 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
412 free_cpumask_var(chanb->cpumask);
413 return -ENOMEM;
414 }
415
416 /**
417 * channel_backend_unregister_notifiers - unregister notifiers
418 * @chan: the channel
419 *
420 * Holds CPU hotplug.
421 */
422 void channel_backend_unregister_notifiers(struct channel_backend *chanb)
423 {
424 const struct lib_ring_buffer_config *config = chanb->config;
425
426 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
427 unregister_hotcpu_notifier(&chanb->cpu_hp_notifier);
428 }
429
430 /**
431 * channel_backend_free - destroy the channel
432 * @chan: the channel
433 *
434 * Destroy all channel buffers and frees the channel.
435 */
436 void channel_backend_free(struct channel_backend *chanb)
437 {
438 const struct lib_ring_buffer_config *config = chanb->config;
439 unsigned int i;
440
441 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
442 for_each_possible_cpu(i) {
443 struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
444
445 if (!buf->backend.allocated)
446 continue;
447 lib_ring_buffer_free(buf);
448 }
449 free_cpumask_var(chanb->cpumask);
450 free_percpu(chanb->buf);
451 } else {
452 struct lib_ring_buffer *buf = chanb->buf;
453
454 CHAN_WARN_ON(chanb, !buf->backend.allocated);
455 lib_ring_buffer_free(buf);
456 kfree(buf);
457 }
458 }
459
460 /**
461 * lib_ring_buffer_write - write data to a ring_buffer buffer.
462 * @bufb : buffer backend
463 * @offset : offset within the buffer
464 * @src : source address
465 * @len : length to write
466 * @pagecpy : page size copied so far
467 */
468 void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, size_t offset,
469 const void *src, size_t len, ssize_t pagecpy)
470 {
471 struct channel_backend *chanb = &bufb->chan->backend;
472 const struct lib_ring_buffer_config *config = chanb->config;
473 size_t sbidx, index;
474 struct lib_ring_buffer_backend_pages *rpages;
475 unsigned long sb_bindex, id;
476
477 do {
478 len -= pagecpy;
479 src += pagecpy;
480 offset += pagecpy;
481 sbidx = offset >> chanb->subbuf_size_order;
482 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
483
484 /*
485 * Underlying layer should never ask for writes across
486 * subbuffers.
487 */
488 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
489
490 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
491 id = bufb->buf_wsb[sbidx].id;
492 sb_bindex = subbuffer_id_get_index(config, id);
493 rpages = bufb->array[sb_bindex];
494 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
495 && subbuffer_id_is_noref(config, id));
496 lib_ring_buffer_do_copy(config,
497 rpages->p[index].virt
498 + (offset & ~PAGE_MASK),
499 src, pagecpy);
500 } while (unlikely(len != pagecpy));
501 }
502 EXPORT_SYMBOL_GPL(_lib_ring_buffer_write);
503
504 /**
505 * lib_ring_buffer_read - read data from ring_buffer_buffer.
506 * @bufb : buffer backend
507 * @offset : offset within the buffer
508 * @dest : destination address
509 * @len : length to copy to destination
510 *
511 * Should be protected by get_subbuf/put_subbuf.
512 * Returns the length copied.
513 */
514 size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
515 void *dest, size_t len)
516 {
517 struct channel_backend *chanb = &bufb->chan->backend;
518 const struct lib_ring_buffer_config *config = chanb->config;
519 size_t index;
520 ssize_t pagecpy, orig_len;
521 struct lib_ring_buffer_backend_pages *rpages;
522 unsigned long sb_bindex, id;
523
524 orig_len = len;
525 offset &= chanb->buf_size - 1;
526 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
527 if (unlikely(!len))
528 return 0;
529 for (;;) {
530 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
531 id = bufb->buf_rsb.id;
532 sb_bindex = subbuffer_id_get_index(config, id);
533 rpages = bufb->array[sb_bindex];
534 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
535 && subbuffer_id_is_noref(config, id));
536 memcpy(dest, rpages->p[index].virt + (offset & ~PAGE_MASK),
537 pagecpy);
538 len -= pagecpy;
539 if (likely(!len))
540 break;
541 dest += pagecpy;
542 offset += pagecpy;
543 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
544 /*
545 * Underlying layer should never ask for reads across
546 * subbuffers.
547 */
548 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
549 }
550 return orig_len;
551 }
552 EXPORT_SYMBOL_GPL(lib_ring_buffer_read);
553
554 /**
555 * __lib_ring_buffer_copy_to_user - read data from ring_buffer to userspace
556 * @bufb : buffer backend
557 * @offset : offset within the buffer
558 * @dest : destination userspace address
559 * @len : length to copy to destination
560 *
561 * Should be protected by get_subbuf/put_subbuf.
562 * access_ok() must have been performed on dest addresses prior to call this
563 * function.
564 * Returns -EFAULT on error, 0 if ok.
565 */
566 int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
567 size_t offset, void __user *dest, size_t len)
568 {
569 struct channel_backend *chanb = &bufb->chan->backend;
570 const struct lib_ring_buffer_config *config = chanb->config;
571 size_t index;
572 ssize_t pagecpy, orig_len;
573 struct lib_ring_buffer_backend_pages *rpages;
574 unsigned long sb_bindex, id;
575
576 orig_len = len;
577 offset &= chanb->buf_size - 1;
578 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
579 if (unlikely(!len))
580 return 0;
581 for (;;) {
582 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
583 id = bufb->buf_rsb.id;
584 sb_bindex = subbuffer_id_get_index(config, id);
585 rpages = bufb->array[sb_bindex];
586 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
587 && subbuffer_id_is_noref(config, id));
588 if (__copy_to_user(dest,
589 rpages->p[index].virt + (offset & ~PAGE_MASK),
590 pagecpy))
591 return -EFAULT;
592 len -= pagecpy;
593 if (likely(!len))
594 break;
595 dest += pagecpy;
596 offset += pagecpy;
597 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
598 /*
599 * Underlying layer should never ask for reads across
600 * subbuffers.
601 */
602 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
603 }
604 return 0;
605 }
606 EXPORT_SYMBOL_GPL(__lib_ring_buffer_copy_to_user);
607
608 /**
609 * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
610 * @bufb : buffer backend
611 * @offset : offset within the buffer
612 * @dest : destination address
613 * @len : destination's length
614 *
615 * return string's length
616 * Should be protected by get_subbuf/put_subbuf.
617 */
618 int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
619 void *dest, size_t len)
620 {
621 struct channel_backend *chanb = &bufb->chan->backend;
622 const struct lib_ring_buffer_config *config = chanb->config;
623 size_t index;
624 ssize_t pagecpy, pagelen, strpagelen, orig_offset;
625 char *str;
626 struct lib_ring_buffer_backend_pages *rpages;
627 unsigned long sb_bindex, id;
628
629 offset &= chanb->buf_size - 1;
630 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
631 orig_offset = offset;
632 for (;;) {
633 id = bufb->buf_rsb.id;
634 sb_bindex = subbuffer_id_get_index(config, id);
635 rpages = bufb->array[sb_bindex];
636 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
637 && subbuffer_id_is_noref(config, id));
638 str = (char *)rpages->p[index].virt + (offset & ~PAGE_MASK);
639 pagelen = PAGE_SIZE - (offset & ~PAGE_MASK);
640 strpagelen = strnlen(str, pagelen);
641 if (len) {
642 pagecpy = min_t(size_t, len, strpagelen);
643 if (dest) {
644 memcpy(dest, str, pagecpy);
645 dest += pagecpy;
646 }
647 len -= pagecpy;
648 }
649 offset += strpagelen;
650 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
651 if (strpagelen < pagelen)
652 break;
653 /*
654 * Underlying layer should never ask for reads across
655 * subbuffers.
656 */
657 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
658 }
659 if (dest && len)
660 ((char *)dest)[0] = 0;
661 return offset - orig_offset;
662 }
663 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_cstr);
664
665 /**
666 * lib_ring_buffer_read_get_page - Get a whole page to read from
667 * @bufb : buffer backend
668 * @offset : offset within the buffer
669 * @virt : pointer to page address (output)
670 *
671 * Should be protected by get_subbuf/put_subbuf.
672 * Returns the pointer to the page struct pointer.
673 */
674 struct page **lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb,
675 size_t offset, void ***virt)
676 {
677 size_t index;
678 struct lib_ring_buffer_backend_pages *rpages;
679 struct channel_backend *chanb = &bufb->chan->backend;
680 const struct lib_ring_buffer_config *config = chanb->config;
681 unsigned long sb_bindex, id;
682
683 offset &= chanb->buf_size - 1;
684 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
685 id = bufb->buf_rsb.id;
686 sb_bindex = subbuffer_id_get_index(config, id);
687 rpages = bufb->array[sb_bindex];
688 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
689 && subbuffer_id_is_noref(config, id));
690 *virt = &rpages->p[index].virt;
691 return &rpages->p[index].page;
692 }
693 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_get_page);
694
695 /**
696 * lib_ring_buffer_read_offset_address - get address of a buffer location
697 * @bufb : buffer backend
698 * @offset : offset within the buffer.
699 *
700 * Return the address where a given offset is located (for read).
701 * Should be used to get the current subbuffer header pointer. Given we know
702 * it's never on a page boundary, it's safe to write directly to this address,
703 * as long as the write is never bigger than a page size.
704 */
705 void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
706 size_t offset)
707 {
708 size_t index;
709 struct lib_ring_buffer_backend_pages *rpages;
710 struct channel_backend *chanb = &bufb->chan->backend;
711 const struct lib_ring_buffer_config *config = chanb->config;
712 unsigned long sb_bindex, id;
713
714 offset &= chanb->buf_size - 1;
715 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
716 id = bufb->buf_rsb.id;
717 sb_bindex = subbuffer_id_get_index(config, id);
718 rpages = bufb->array[sb_bindex];
719 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
720 && subbuffer_id_is_noref(config, id));
721 return rpages->p[index].virt + (offset & ~PAGE_MASK);
722 }
723 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_offset_address);
724
725 /**
726 * lib_ring_buffer_offset_address - get address of a location within the buffer
727 * @bufb : buffer backend
728 * @offset : offset within the buffer.
729 *
730 * Return the address where a given offset is located.
731 * Should be used to get the current subbuffer header pointer. Given we know
732 * it's always at the beginning of a page, it's safe to write directly to this
733 * address, as long as the write is never bigger than a page size.
734 */
735 void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
736 size_t offset)
737 {
738 size_t sbidx, index;
739 struct lib_ring_buffer_backend_pages *rpages;
740 struct channel_backend *chanb = &bufb->chan->backend;
741 const struct lib_ring_buffer_config *config = chanb->config;
742 unsigned long sb_bindex, id;
743
744 offset &= chanb->buf_size - 1;
745 sbidx = offset >> chanb->subbuf_size_order;
746 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
747 id = bufb->buf_wsb[sbidx].id;
748 sb_bindex = subbuffer_id_get_index(config, id);
749 rpages = bufb->array[sb_bindex];
750 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
751 && subbuffer_id_is_noref(config, id));
752 return rpages->p[index].virt + (offset & ~PAGE_MASK);
753 }
754 EXPORT_SYMBOL_GPL(lib_ring_buffer_offset_address);
This page took 0.048543 seconds and 5 git commands to generate.