Add a packet sequence number
[lttng-modules.git] / lib / ringbuffer / ring_buffer_backend.c
1 /*
2 * ring_buffer_backend.c
3 *
4 * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; only
9 * version 2.1 of the License.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 #include <linux/stddef.h>
22 #include <linux/module.h>
23 #include <linux/string.h>
24 #include <linux/bitops.h>
25 #include <linux/delay.h>
26 #include <linux/errno.h>
27 #include <linux/slab.h>
28 #include <linux/cpu.h>
29 #include <linux/mm.h>
30
31 #include "../../wrapper/vmalloc.h" /* for wrapper_vmalloc_sync_all() */
32 #include "../../wrapper/ringbuffer/config.h"
33 #include "../../wrapper/ringbuffer/backend.h"
34 #include "../../wrapper/ringbuffer/frontend.h"
35
36 /**
37 * lib_ring_buffer_backend_allocate - allocate a channel buffer
38 * @config: ring buffer instance configuration
39 * @buf: the buffer struct
40 * @size: total size of the buffer
41 * @num_subbuf: number of subbuffers
42 * @extra_reader_sb: need extra subbuffer for reader
43 */
44 static
45 int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config,
46 struct lib_ring_buffer_backend *bufb,
47 size_t size, size_t num_subbuf,
48 int extra_reader_sb)
49 {
50 struct channel_backend *chanb = &bufb->chan->backend;
51 unsigned long j, num_pages, num_pages_per_subbuf, page_idx = 0;
52 unsigned long subbuf_size, mmap_offset = 0;
53 unsigned long num_subbuf_alloc;
54 struct page **pages;
55 void **virt;
56 unsigned long i;
57
58 num_pages = size >> PAGE_SHIFT;
59 num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf);
60 subbuf_size = chanb->subbuf_size;
61 num_subbuf_alloc = num_subbuf;
62
63 if (extra_reader_sb) {
64 num_pages += num_pages_per_subbuf; /* Add pages for reader */
65 num_subbuf_alloc++;
66 }
67
68 pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages,
69 1 << INTERNODE_CACHE_SHIFT),
70 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
71 if (unlikely(!pages))
72 goto pages_error;
73
74 virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages,
75 1 << INTERNODE_CACHE_SHIFT),
76 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
77 if (unlikely(!virt))
78 goto virt_error;
79
80 bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array)
81 * num_subbuf_alloc,
82 1 << INTERNODE_CACHE_SHIFT),
83 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
84 if (unlikely(!bufb->array))
85 goto array_error;
86
87 for (i = 0; i < num_pages; i++) {
88 pages[i] = alloc_pages_node(cpu_to_node(max(bufb->cpu, 0)),
89 GFP_KERNEL | __GFP_ZERO, 0);
90 if (unlikely(!pages[i]))
91 goto depopulate;
92 virt[i] = page_address(pages[i]);
93 }
94 bufb->num_pages_per_subbuf = num_pages_per_subbuf;
95
96 /* Allocate backend pages array elements */
97 for (i = 0; i < num_subbuf_alloc; i++) {
98 bufb->array[i] =
99 kzalloc_node(ALIGN(
100 sizeof(struct lib_ring_buffer_backend_pages) +
101 sizeof(struct lib_ring_buffer_backend_page)
102 * num_pages_per_subbuf,
103 1 << INTERNODE_CACHE_SHIFT),
104 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
105 if (!bufb->array[i])
106 goto free_array;
107 }
108
109 /* Allocate write-side subbuffer table */
110 bufb->buf_wsb = kzalloc_node(ALIGN(
111 sizeof(struct lib_ring_buffer_backend_subbuffer)
112 * num_subbuf,
113 1 << INTERNODE_CACHE_SHIFT),
114 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
115 if (unlikely(!bufb->buf_wsb))
116 goto free_array;
117
118 for (i = 0; i < num_subbuf; i++)
119 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
120
121 /* Assign read-side subbuffer table */
122 if (extra_reader_sb)
123 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
124 num_subbuf_alloc - 1);
125 else
126 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
127
128 /* Allocate subbuffer packet counter table */
129 bufb->buf_cnt = kzalloc_node(ALIGN(
130 sizeof(struct lib_ring_buffer_backend_counts)
131 * num_subbuf,
132 1 << INTERNODE_CACHE_SHIFT),
133 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
134 if (unlikely(!bufb->buf_cnt))
135 goto free_wsb;
136
137 /* Assign pages to page index */
138 for (i = 0; i < num_subbuf_alloc; i++) {
139 for (j = 0; j < num_pages_per_subbuf; j++) {
140 CHAN_WARN_ON(chanb, page_idx > num_pages);
141 bufb->array[i]->p[j].virt = virt[page_idx];
142 bufb->array[i]->p[j].page = pages[page_idx];
143 page_idx++;
144 }
145 if (config->output == RING_BUFFER_MMAP) {
146 bufb->array[i]->mmap_offset = mmap_offset;
147 mmap_offset += subbuf_size;
148 }
149 }
150
151 /*
152 * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
153 * will not fault.
154 */
155 wrapper_vmalloc_sync_all();
156 kfree(virt);
157 kfree(pages);
158 return 0;
159
160 free_wsb:
161 kfree(bufb->buf_wsb);
162 free_array:
163 for (i = 0; (i < num_subbuf_alloc && bufb->array[i]); i++)
164 kfree(bufb->array[i]);
165 depopulate:
166 /* Free all allocated pages */
167 for (i = 0; (i < num_pages && pages[i]); i++)
168 __free_page(pages[i]);
169 kfree(bufb->array);
170 array_error:
171 kfree(virt);
172 virt_error:
173 kfree(pages);
174 pages_error:
175 return -ENOMEM;
176 }
177
178 int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
179 struct channel_backend *chanb, int cpu)
180 {
181 const struct lib_ring_buffer_config *config = &chanb->config;
182
183 bufb->chan = container_of(chanb, struct channel, backend);
184 bufb->cpu = cpu;
185
186 return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
187 chanb->num_subbuf,
188 chanb->extra_reader_sb);
189 }
190
191 void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
192 {
193 struct channel_backend *chanb = &bufb->chan->backend;
194 unsigned long i, j, num_subbuf_alloc;
195
196 num_subbuf_alloc = chanb->num_subbuf;
197 if (chanb->extra_reader_sb)
198 num_subbuf_alloc++;
199
200 kfree(bufb->buf_wsb);
201 kfree(bufb->buf_cnt);
202 for (i = 0; i < num_subbuf_alloc; i++) {
203 for (j = 0; j < bufb->num_pages_per_subbuf; j++)
204 __free_page(bufb->array[i]->p[j].page);
205 kfree(bufb->array[i]);
206 }
207 kfree(bufb->array);
208 bufb->allocated = 0;
209 }
210
211 void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
212 {
213 struct channel_backend *chanb = &bufb->chan->backend;
214 const struct lib_ring_buffer_config *config = &chanb->config;
215 unsigned long num_subbuf_alloc;
216 unsigned int i;
217
218 num_subbuf_alloc = chanb->num_subbuf;
219 if (chanb->extra_reader_sb)
220 num_subbuf_alloc++;
221
222 for (i = 0; i < chanb->num_subbuf; i++)
223 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
224 if (chanb->extra_reader_sb)
225 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
226 num_subbuf_alloc - 1);
227 else
228 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
229
230 for (i = 0; i < num_subbuf_alloc; i++) {
231 /* Don't reset mmap_offset */
232 v_set(config, &bufb->array[i]->records_commit, 0);
233 v_set(config, &bufb->array[i]->records_unread, 0);
234 bufb->array[i]->data_size = 0;
235 /* Don't reset backend page and virt addresses */
236 }
237 /* Don't reset num_pages_per_subbuf, cpu, allocated */
238 v_set(config, &bufb->records_read, 0);
239 }
240
241 /*
242 * The frontend is responsible for also calling ring_buffer_backend_reset for
243 * each buffer when calling channel_backend_reset.
244 */
245 void channel_backend_reset(struct channel_backend *chanb)
246 {
247 struct channel *chan = container_of(chanb, struct channel, backend);
248 const struct lib_ring_buffer_config *config = &chanb->config;
249
250 /*
251 * Don't reset buf_size, subbuf_size, subbuf_size_order,
252 * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
253 * priv, notifiers, config, cpumask and name.
254 */
255 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
256 }
257
258 #ifdef CONFIG_HOTPLUG_CPU
259 /**
260 * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
261 * @nb: notifier block
262 * @action: hotplug action to take
263 * @hcpu: CPU number
264 *
265 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
266 */
267 static
268 int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
269 unsigned long action,
270 void *hcpu)
271 {
272 unsigned int cpu = (unsigned long)hcpu;
273 struct channel_backend *chanb = container_of(nb, struct channel_backend,
274 cpu_hp_notifier);
275 const struct lib_ring_buffer_config *config = &chanb->config;
276 struct lib_ring_buffer *buf;
277 int ret;
278
279 CHAN_WARN_ON(chanb, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
280
281 switch (action) {
282 case CPU_UP_PREPARE:
283 case CPU_UP_PREPARE_FROZEN:
284 buf = per_cpu_ptr(chanb->buf, cpu);
285 ret = lib_ring_buffer_create(buf, chanb, cpu);
286 if (ret) {
287 printk(KERN_ERR
288 "ring_buffer_cpu_hp_callback: cpu %d "
289 "buffer creation failed\n", cpu);
290 return NOTIFY_BAD;
291 }
292 break;
293 case CPU_DEAD:
294 case CPU_DEAD_FROZEN:
295 /* No need to do a buffer switch here, because it will happen
296 * when tracing is stopped, or will be done by switch timer CPU
297 * DEAD callback. */
298 break;
299 }
300 return NOTIFY_OK;
301 }
302 #endif
303
304 /**
305 * channel_backend_init - initialize a channel backend
306 * @chanb: channel backend
307 * @name: channel name
308 * @config: client ring buffer configuration
309 * @priv: client private data
310 * @parent: dentry of parent directory, %NULL for root directory
311 * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
312 * @num_subbuf: number of sub-buffers (power of 2)
313 *
314 * Returns channel pointer if successful, %NULL otherwise.
315 *
316 * Creates per-cpu channel buffers using the sizes and attributes
317 * specified. The created channel buffer files will be named
318 * name_0...name_N-1. File permissions will be %S_IRUSR.
319 *
320 * Called with CPU hotplug disabled.
321 */
322 int channel_backend_init(struct channel_backend *chanb,
323 const char *name,
324 const struct lib_ring_buffer_config *config,
325 void *priv, size_t subbuf_size, size_t num_subbuf)
326 {
327 struct channel *chan = container_of(chanb, struct channel, backend);
328 unsigned int i;
329 int ret;
330
331 if (!name)
332 return -EPERM;
333
334 /* Check that the subbuffer size is larger than a page. */
335 if (subbuf_size < PAGE_SIZE)
336 return -EINVAL;
337
338 /*
339 * Make sure the number of subbuffers and subbuffer size are
340 * power of 2 and nonzero.
341 */
342 if (!subbuf_size || (subbuf_size & (subbuf_size - 1)))
343 return -EINVAL;
344 if (!num_subbuf || (num_subbuf & (num_subbuf - 1)))
345 return -EINVAL;
346 /*
347 * Overwrite mode buffers require at least 2 subbuffers per
348 * buffer.
349 */
350 if (config->mode == RING_BUFFER_OVERWRITE && num_subbuf < 2)
351 return -EINVAL;
352
353 ret = subbuffer_id_check_index(config, num_subbuf);
354 if (ret)
355 return ret;
356
357 chanb->priv = priv;
358 chanb->buf_size = num_subbuf * subbuf_size;
359 chanb->subbuf_size = subbuf_size;
360 chanb->buf_size_order = get_count_order(chanb->buf_size);
361 chanb->subbuf_size_order = get_count_order(subbuf_size);
362 chanb->num_subbuf_order = get_count_order(num_subbuf);
363 chanb->extra_reader_sb =
364 (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0;
365 chanb->num_subbuf = num_subbuf;
366 strlcpy(chanb->name, name, NAME_MAX);
367 memcpy(&chanb->config, config, sizeof(chanb->config));
368
369 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
370 if (!zalloc_cpumask_var(&chanb->cpumask, GFP_KERNEL))
371 return -ENOMEM;
372 }
373
374 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
375 /* Allocating the buffer per-cpu structures */
376 chanb->buf = alloc_percpu(struct lib_ring_buffer);
377 if (!chanb->buf)
378 goto free_cpumask;
379
380 /*
381 * In case of non-hotplug cpu, if the ring-buffer is allocated
382 * in early initcall, it will not be notified of secondary cpus.
383 * In that off case, we need to allocate for all possible cpus.
384 */
385 #ifdef CONFIG_HOTPLUG_CPU
386 /*
387 * buf->backend.allocated test takes care of concurrent CPU
388 * hotplug.
389 * Priority higher than frontend, so we create the ring buffer
390 * before we start the timer.
391 */
392 chanb->cpu_hp_notifier.notifier_call =
393 lib_ring_buffer_cpu_hp_callback;
394 chanb->cpu_hp_notifier.priority = 5;
395 register_hotcpu_notifier(&chanb->cpu_hp_notifier);
396
397 get_online_cpus();
398 for_each_online_cpu(i) {
399 ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
400 chanb, i);
401 if (ret)
402 goto free_bufs; /* cpu hotplug locked */
403 }
404 put_online_cpus();
405 #else
406 for_each_possible_cpu(i) {
407 ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
408 chanb, i);
409 if (ret)
410 goto free_bufs; /* cpu hotplug locked */
411 }
412 #endif
413 } else {
414 chanb->buf = kzalloc(sizeof(struct lib_ring_buffer), GFP_KERNEL);
415 if (!chanb->buf)
416 goto free_cpumask;
417 ret = lib_ring_buffer_create(chanb->buf, chanb, -1);
418 if (ret)
419 goto free_bufs;
420 }
421 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
422
423 return 0;
424
425 free_bufs:
426 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
427 for_each_possible_cpu(i) {
428 struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
429
430 if (!buf->backend.allocated)
431 continue;
432 lib_ring_buffer_free(buf);
433 }
434 #ifdef CONFIG_HOTPLUG_CPU
435 put_online_cpus();
436 #endif
437 free_percpu(chanb->buf);
438 } else
439 kfree(chanb->buf);
440 free_cpumask:
441 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
442 free_cpumask_var(chanb->cpumask);
443 return -ENOMEM;
444 }
445
446 /**
447 * channel_backend_unregister_notifiers - unregister notifiers
448 * @chan: the channel
449 *
450 * Holds CPU hotplug.
451 */
452 void channel_backend_unregister_notifiers(struct channel_backend *chanb)
453 {
454 const struct lib_ring_buffer_config *config = &chanb->config;
455
456 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
457 unregister_hotcpu_notifier(&chanb->cpu_hp_notifier);
458 }
459
460 /**
461 * channel_backend_free - destroy the channel
462 * @chan: the channel
463 *
464 * Destroy all channel buffers and frees the channel.
465 */
466 void channel_backend_free(struct channel_backend *chanb)
467 {
468 const struct lib_ring_buffer_config *config = &chanb->config;
469 unsigned int i;
470
471 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
472 for_each_possible_cpu(i) {
473 struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
474
475 if (!buf->backend.allocated)
476 continue;
477 lib_ring_buffer_free(buf);
478 }
479 free_cpumask_var(chanb->cpumask);
480 free_percpu(chanb->buf);
481 } else {
482 struct lib_ring_buffer *buf = chanb->buf;
483
484 CHAN_WARN_ON(chanb, !buf->backend.allocated);
485 lib_ring_buffer_free(buf);
486 kfree(buf);
487 }
488 }
489
490 /**
491 * lib_ring_buffer_write - write data to a ring_buffer buffer.
492 * @bufb : buffer backend
493 * @offset : offset within the buffer
494 * @src : source address
495 * @len : length to write
496 * @pagecpy : page size copied so far
497 */
498 void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, size_t offset,
499 const void *src, size_t len, size_t pagecpy)
500 {
501 struct channel_backend *chanb = &bufb->chan->backend;
502 const struct lib_ring_buffer_config *config = &chanb->config;
503 size_t sbidx, index;
504 struct lib_ring_buffer_backend_pages *rpages;
505 unsigned long sb_bindex, id;
506
507 do {
508 len -= pagecpy;
509 src += pagecpy;
510 offset += pagecpy;
511 sbidx = offset >> chanb->subbuf_size_order;
512 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
513
514 /*
515 * Underlying layer should never ask for writes across
516 * subbuffers.
517 */
518 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
519
520 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
521 id = bufb->buf_wsb[sbidx].id;
522 sb_bindex = subbuffer_id_get_index(config, id);
523 rpages = bufb->array[sb_bindex];
524 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
525 && subbuffer_id_is_noref(config, id));
526 lib_ring_buffer_do_copy(config,
527 rpages->p[index].virt
528 + (offset & ~PAGE_MASK),
529 src, pagecpy);
530 } while (unlikely(len != pagecpy));
531 }
532 EXPORT_SYMBOL_GPL(_lib_ring_buffer_write);
533
534
535 /**
536 * lib_ring_buffer_memset - write len bytes of c to a ring_buffer buffer.
537 * @bufb : buffer backend
538 * @offset : offset within the buffer
539 * @c : the byte to write
540 * @len : length to write
541 * @pagecpy : page size copied so far
542 */
543 void _lib_ring_buffer_memset(struct lib_ring_buffer_backend *bufb,
544 size_t offset,
545 int c, size_t len, size_t pagecpy)
546 {
547 struct channel_backend *chanb = &bufb->chan->backend;
548 const struct lib_ring_buffer_config *config = &chanb->config;
549 size_t sbidx, index;
550 struct lib_ring_buffer_backend_pages *rpages;
551 unsigned long sb_bindex, id;
552
553 do {
554 len -= pagecpy;
555 offset += pagecpy;
556 sbidx = offset >> chanb->subbuf_size_order;
557 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
558
559 /*
560 * Underlying layer should never ask for writes across
561 * subbuffers.
562 */
563 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
564
565 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
566 id = bufb->buf_wsb[sbidx].id;
567 sb_bindex = subbuffer_id_get_index(config, id);
568 rpages = bufb->array[sb_bindex];
569 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
570 && subbuffer_id_is_noref(config, id));
571 lib_ring_buffer_do_memset(rpages->p[index].virt
572 + (offset & ~PAGE_MASK),
573 c, pagecpy);
574 } while (unlikely(len != pagecpy));
575 }
576 EXPORT_SYMBOL_GPL(_lib_ring_buffer_memset);
577
578 /**
579 * lib_ring_buffer_strcpy - write string data to a ring_buffer buffer.
580 * @bufb : buffer backend
581 * @offset : offset within the buffer
582 * @src : source address
583 * @len : length to write
584 * @pagecpy : page size copied so far
585 * @pad : character to use for padding
586 */
587 void _lib_ring_buffer_strcpy(struct lib_ring_buffer_backend *bufb,
588 size_t offset, const char *src, size_t len,
589 size_t pagecpy, int pad)
590 {
591 struct channel_backend *chanb = &bufb->chan->backend;
592 const struct lib_ring_buffer_config *config = &chanb->config;
593 size_t sbidx, index;
594 struct lib_ring_buffer_backend_pages *rpages;
595 unsigned long sb_bindex, id;
596 int src_terminated = 0;
597
598 CHAN_WARN_ON(chanb, !len);
599 offset += pagecpy;
600 do {
601 len -= pagecpy;
602 if (!src_terminated)
603 src += pagecpy;
604 sbidx = offset >> chanb->subbuf_size_order;
605 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
606
607 /*
608 * Underlying layer should never ask for writes across
609 * subbuffers.
610 */
611 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
612
613 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
614 id = bufb->buf_wsb[sbidx].id;
615 sb_bindex = subbuffer_id_get_index(config, id);
616 rpages = bufb->array[sb_bindex];
617 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
618 && subbuffer_id_is_noref(config, id));
619
620 if (likely(!src_terminated)) {
621 size_t count, to_copy;
622
623 to_copy = pagecpy;
624 if (pagecpy == len)
625 to_copy--; /* Final '\0' */
626 count = lib_ring_buffer_do_strcpy(config,
627 rpages->p[index].virt
628 + (offset & ~PAGE_MASK),
629 src, to_copy);
630 offset += count;
631 /* Padding */
632 if (unlikely(count < to_copy)) {
633 size_t pad_len = to_copy - count;
634
635 /* Next pages will have padding */
636 src_terminated = 1;
637 lib_ring_buffer_do_memset(rpages->p[index].virt
638 + (offset & ~PAGE_MASK),
639 pad, pad_len);
640 offset += pad_len;
641 }
642 } else {
643 size_t pad_len;
644
645 pad_len = pagecpy;
646 if (pagecpy == len)
647 pad_len--; /* Final '\0' */
648 lib_ring_buffer_do_memset(rpages->p[index].virt
649 + (offset & ~PAGE_MASK),
650 pad, pad_len);
651 offset += pad_len;
652 }
653 } while (unlikely(len != pagecpy));
654 /* Ending '\0' */
655 lib_ring_buffer_do_memset(rpages->p[index].virt + (offset & ~PAGE_MASK),
656 '\0', 1);
657 }
658 EXPORT_SYMBOL_GPL(_lib_ring_buffer_strcpy);
659
660 /**
661 * lib_ring_buffer_copy_from_user_inatomic - write user data to a ring_buffer buffer.
662 * @bufb : buffer backend
663 * @offset : offset within the buffer
664 * @src : source address
665 * @len : length to write
666 * @pagecpy : page size copied so far
667 *
668 * This function deals with userspace pointers, it should never be called
669 * directly without having the src pointer checked with access_ok()
670 * previously.
671 */
672 void _lib_ring_buffer_copy_from_user_inatomic(struct lib_ring_buffer_backend *bufb,
673 size_t offset,
674 const void __user *src, size_t len,
675 size_t pagecpy)
676 {
677 struct channel_backend *chanb = &bufb->chan->backend;
678 const struct lib_ring_buffer_config *config = &chanb->config;
679 size_t sbidx, index;
680 struct lib_ring_buffer_backend_pages *rpages;
681 unsigned long sb_bindex, id;
682 int ret;
683
684 do {
685 len -= pagecpy;
686 src += pagecpy;
687 offset += pagecpy;
688 sbidx = offset >> chanb->subbuf_size_order;
689 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
690
691 /*
692 * Underlying layer should never ask for writes across
693 * subbuffers.
694 */
695 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
696
697 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
698 id = bufb->buf_wsb[sbidx].id;
699 sb_bindex = subbuffer_id_get_index(config, id);
700 rpages = bufb->array[sb_bindex];
701 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
702 && subbuffer_id_is_noref(config, id));
703 ret = lib_ring_buffer_do_copy_from_user_inatomic(rpages->p[index].virt
704 + (offset & ~PAGE_MASK),
705 src, pagecpy) != 0;
706 if (ret > 0) {
707 offset += (pagecpy - ret);
708 len -= (pagecpy - ret);
709 _lib_ring_buffer_memset(bufb, offset, 0, len, 0);
710 break; /* stop copy */
711 }
712 } while (unlikely(len != pagecpy));
713 }
714 EXPORT_SYMBOL_GPL(_lib_ring_buffer_copy_from_user_inatomic);
715
716 /**
717 * lib_ring_buffer_strcpy_from_user_inatomic - write userspace string data to a ring_buffer buffer.
718 * @bufb : buffer backend
719 * @offset : offset within the buffer
720 * @src : source address
721 * @len : length to write
722 * @pagecpy : page size copied so far
723 * @pad : character to use for padding
724 *
725 * This function deals with userspace pointers, it should never be called
726 * directly without having the src pointer checked with access_ok()
727 * previously.
728 */
729 void _lib_ring_buffer_strcpy_from_user_inatomic(struct lib_ring_buffer_backend *bufb,
730 size_t offset, const char __user *src, size_t len,
731 size_t pagecpy, int pad)
732 {
733 struct channel_backend *chanb = &bufb->chan->backend;
734 const struct lib_ring_buffer_config *config = &chanb->config;
735 size_t sbidx, index;
736 struct lib_ring_buffer_backend_pages *rpages;
737 unsigned long sb_bindex, id;
738 int src_terminated = 0;
739
740 offset += pagecpy;
741 do {
742 len -= pagecpy;
743 if (!src_terminated)
744 src += pagecpy;
745 sbidx = offset >> chanb->subbuf_size_order;
746 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
747
748 /*
749 * Underlying layer should never ask for writes across
750 * subbuffers.
751 */
752 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
753
754 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
755 id = bufb->buf_wsb[sbidx].id;
756 sb_bindex = subbuffer_id_get_index(config, id);
757 rpages = bufb->array[sb_bindex];
758 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
759 && subbuffer_id_is_noref(config, id));
760
761 if (likely(!src_terminated)) {
762 size_t count, to_copy;
763
764 to_copy = pagecpy;
765 if (pagecpy == len)
766 to_copy--; /* Final '\0' */
767 count = lib_ring_buffer_do_strcpy_from_user_inatomic(config,
768 rpages->p[index].virt
769 + (offset & ~PAGE_MASK),
770 src, to_copy);
771 offset += count;
772 /* Padding */
773 if (unlikely(count < to_copy)) {
774 size_t pad_len = to_copy - count;
775
776 /* Next pages will have padding */
777 src_terminated = 1;
778 lib_ring_buffer_do_memset(rpages->p[index].virt
779 + (offset & ~PAGE_MASK),
780 pad, pad_len);
781 offset += pad_len;
782 }
783 } else {
784 size_t pad_len;
785
786 pad_len = pagecpy;
787 if (pagecpy == len)
788 pad_len--; /* Final '\0' */
789 lib_ring_buffer_do_memset(rpages->p[index].virt
790 + (offset & ~PAGE_MASK),
791 pad, pad_len);
792 offset += pad_len;
793 }
794 } while (unlikely(len != pagecpy));
795 /* Ending '\0' */
796 lib_ring_buffer_do_memset(rpages->p[index].virt + (offset & ~PAGE_MASK),
797 '\0', 1);
798 }
799 EXPORT_SYMBOL_GPL(_lib_ring_buffer_strcpy_from_user_inatomic);
800
801 /**
802 * lib_ring_buffer_read - read data from ring_buffer_buffer.
803 * @bufb : buffer backend
804 * @offset : offset within the buffer
805 * @dest : destination address
806 * @len : length to copy to destination
807 *
808 * Should be protected by get_subbuf/put_subbuf.
809 * Returns the length copied.
810 */
811 size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
812 void *dest, size_t len)
813 {
814 struct channel_backend *chanb = &bufb->chan->backend;
815 const struct lib_ring_buffer_config *config = &chanb->config;
816 size_t index, pagecpy, orig_len;
817 struct lib_ring_buffer_backend_pages *rpages;
818 unsigned long sb_bindex, id;
819
820 orig_len = len;
821 offset &= chanb->buf_size - 1;
822 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
823 if (unlikely(!len))
824 return 0;
825 for (;;) {
826 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
827 id = bufb->buf_rsb.id;
828 sb_bindex = subbuffer_id_get_index(config, id);
829 rpages = bufb->array[sb_bindex];
830 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
831 && subbuffer_id_is_noref(config, id));
832 memcpy(dest, rpages->p[index].virt + (offset & ~PAGE_MASK),
833 pagecpy);
834 len -= pagecpy;
835 if (likely(!len))
836 break;
837 dest += pagecpy;
838 offset += pagecpy;
839 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
840 /*
841 * Underlying layer should never ask for reads across
842 * subbuffers.
843 */
844 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
845 }
846 return orig_len;
847 }
848 EXPORT_SYMBOL_GPL(lib_ring_buffer_read);
849
850 /**
851 * __lib_ring_buffer_copy_to_user - read data from ring_buffer to userspace
852 * @bufb : buffer backend
853 * @offset : offset within the buffer
854 * @dest : destination userspace address
855 * @len : length to copy to destination
856 *
857 * Should be protected by get_subbuf/put_subbuf.
858 * access_ok() must have been performed on dest addresses prior to call this
859 * function.
860 * Returns -EFAULT on error, 0 if ok.
861 */
862 int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
863 size_t offset, void __user *dest, size_t len)
864 {
865 struct channel_backend *chanb = &bufb->chan->backend;
866 const struct lib_ring_buffer_config *config = &chanb->config;
867 size_t index;
868 ssize_t pagecpy;
869 struct lib_ring_buffer_backend_pages *rpages;
870 unsigned long sb_bindex, id;
871
872 offset &= chanb->buf_size - 1;
873 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
874 if (unlikely(!len))
875 return 0;
876 for (;;) {
877 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
878 id = bufb->buf_rsb.id;
879 sb_bindex = subbuffer_id_get_index(config, id);
880 rpages = bufb->array[sb_bindex];
881 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
882 && subbuffer_id_is_noref(config, id));
883 if (__copy_to_user(dest,
884 rpages->p[index].virt + (offset & ~PAGE_MASK),
885 pagecpy))
886 return -EFAULT;
887 len -= pagecpy;
888 if (likely(!len))
889 break;
890 dest += pagecpy;
891 offset += pagecpy;
892 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
893 /*
894 * Underlying layer should never ask for reads across
895 * subbuffers.
896 */
897 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
898 }
899 return 0;
900 }
901 EXPORT_SYMBOL_GPL(__lib_ring_buffer_copy_to_user);
902
903 /**
904 * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
905 * @bufb : buffer backend
906 * @offset : offset within the buffer
907 * @dest : destination address
908 * @len : destination's length
909 *
910 * Return string's length, or -EINVAL on error.
911 * Should be protected by get_subbuf/put_subbuf.
912 * Destination length should be at least 1 to hold '\0'.
913 */
914 int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
915 void *dest, size_t len)
916 {
917 struct channel_backend *chanb = &bufb->chan->backend;
918 const struct lib_ring_buffer_config *config = &chanb->config;
919 size_t index;
920 ssize_t pagecpy, pagelen, strpagelen, orig_offset;
921 char *str;
922 struct lib_ring_buffer_backend_pages *rpages;
923 unsigned long sb_bindex, id;
924
925 offset &= chanb->buf_size - 1;
926 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
927 orig_offset = offset;
928 if (unlikely(!len))
929 return -EINVAL;
930 for (;;) {
931 id = bufb->buf_rsb.id;
932 sb_bindex = subbuffer_id_get_index(config, id);
933 rpages = bufb->array[sb_bindex];
934 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
935 && subbuffer_id_is_noref(config, id));
936 str = (char *)rpages->p[index].virt + (offset & ~PAGE_MASK);
937 pagelen = PAGE_SIZE - (offset & ~PAGE_MASK);
938 strpagelen = strnlen(str, pagelen);
939 if (len) {
940 pagecpy = min_t(size_t, len, strpagelen);
941 if (dest) {
942 memcpy(dest, str, pagecpy);
943 dest += pagecpy;
944 }
945 len -= pagecpy;
946 }
947 offset += strpagelen;
948 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
949 if (strpagelen < pagelen)
950 break;
951 /*
952 * Underlying layer should never ask for reads across
953 * subbuffers.
954 */
955 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
956 }
957 if (dest && len)
958 ((char *)dest)[0] = 0;
959 return offset - orig_offset;
960 }
961 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_cstr);
962
963 /**
964 * lib_ring_buffer_read_get_page - Get a whole page to read from
965 * @bufb : buffer backend
966 * @offset : offset within the buffer
967 * @virt : pointer to page address (output)
968 *
969 * Should be protected by get_subbuf/put_subbuf.
970 * Returns the pointer to the page struct pointer.
971 */
972 struct page **lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb,
973 size_t offset, void ***virt)
974 {
975 size_t index;
976 struct lib_ring_buffer_backend_pages *rpages;
977 struct channel_backend *chanb = &bufb->chan->backend;
978 const struct lib_ring_buffer_config *config = &chanb->config;
979 unsigned long sb_bindex, id;
980
981 offset &= chanb->buf_size - 1;
982 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
983 id = bufb->buf_rsb.id;
984 sb_bindex = subbuffer_id_get_index(config, id);
985 rpages = bufb->array[sb_bindex];
986 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
987 && subbuffer_id_is_noref(config, id));
988 *virt = &rpages->p[index].virt;
989 return &rpages->p[index].page;
990 }
991 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_get_page);
992
993 /**
994 * lib_ring_buffer_read_offset_address - get address of a buffer location
995 * @bufb : buffer backend
996 * @offset : offset within the buffer.
997 *
998 * Return the address where a given offset is located (for read).
999 * Should be used to get the current subbuffer header pointer. Given we know
1000 * it's never on a page boundary, it's safe to read/write directly
1001 * from/to this address, as long as the read/write is never bigger than a
1002 * page size.
1003 */
1004 void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
1005 size_t offset)
1006 {
1007 size_t index;
1008 struct lib_ring_buffer_backend_pages *rpages;
1009 struct channel_backend *chanb = &bufb->chan->backend;
1010 const struct lib_ring_buffer_config *config = &chanb->config;
1011 unsigned long sb_bindex, id;
1012
1013 offset &= chanb->buf_size - 1;
1014 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
1015 id = bufb->buf_rsb.id;
1016 sb_bindex = subbuffer_id_get_index(config, id);
1017 rpages = bufb->array[sb_bindex];
1018 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
1019 && subbuffer_id_is_noref(config, id));
1020 return rpages->p[index].virt + (offset & ~PAGE_MASK);
1021 }
1022 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_offset_address);
1023
1024 /**
1025 * lib_ring_buffer_offset_address - get address of a location within the buffer
1026 * @bufb : buffer backend
1027 * @offset : offset within the buffer.
1028 *
1029 * Return the address where a given offset is located.
1030 * Should be used to get the current subbuffer header pointer. Given we know
1031 * it's always at the beginning of a page, it's safe to write directly to this
1032 * address, as long as the write is never bigger than a page size.
1033 */
1034 void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
1035 size_t offset)
1036 {
1037 size_t sbidx, index;
1038 struct lib_ring_buffer_backend_pages *rpages;
1039 struct channel_backend *chanb = &bufb->chan->backend;
1040 const struct lib_ring_buffer_config *config = &chanb->config;
1041 unsigned long sb_bindex, id;
1042
1043 offset &= chanb->buf_size - 1;
1044 sbidx = offset >> chanb->subbuf_size_order;
1045 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
1046 id = bufb->buf_wsb[sbidx].id;
1047 sb_bindex = subbuffer_id_get_index(config, id);
1048 rpages = bufb->array[sb_bindex];
1049 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
1050 && subbuffer_id_is_noref(config, id));
1051 return rpages->p[index].virt + (offset & ~PAGE_MASK);
1052 }
1053 EXPORT_SYMBOL_GPL(lib_ring_buffer_offset_address);
This page took 0.080507 seconds and 5 git commands to generate.