Namespace 'struct channel' under 'lttng_ust_lib_ring_buffer_'
[lttng-ust.git] / libringbuffer / frontend_internal.h
1 /*
2 * SPDX-License-Identifier: (LGPL-2.1-only or GPL-2.0-only)
3 *
4 * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * Ring Buffer Library Synchronization Header (internal helpers).
7 *
8 * See ring_buffer_frontend.c for more information on wait-free algorithms.
9 */
10
11 #ifndef _LTTNG_RING_BUFFER_FRONTEND_INTERNAL_H
12 #define _LTTNG_RING_BUFFER_FRONTEND_INTERNAL_H
13
14 #include <urcu/compiler.h>
15 #include <urcu/tls-compat.h>
16 #include <signal.h>
17 #include <stdint.h>
18 #include <pthread.h>
19
20 #include <lttng/ringbuffer-config.h>
21 #include "backend_types.h"
22 #include "frontend_types.h"
23 #include "shm.h"
24
25 /* Buffer offset macros */
26
27 /* buf_trunc mask selects only the buffer number. */
28 static inline
29 unsigned long buf_trunc(unsigned long offset,
30 struct lttng_ust_lib_ring_buffer_channel *chan)
31 {
32 return offset & ~(chan->backend.buf_size - 1);
33
34 }
35
36 /* Select the buffer number value (counter). */
37 static inline
38 unsigned long buf_trunc_val(unsigned long offset,
39 struct lttng_ust_lib_ring_buffer_channel *chan)
40 {
41 return buf_trunc(offset, chan) >> chan->backend.buf_size_order;
42 }
43
44 /* buf_offset mask selects only the offset within the current buffer. */
45 static inline
46 unsigned long buf_offset(unsigned long offset,
47 struct lttng_ust_lib_ring_buffer_channel *chan)
48 {
49 return offset & (chan->backend.buf_size - 1);
50 }
51
52 /* subbuf_offset mask selects the offset within the current subbuffer. */
53 static inline
54 unsigned long subbuf_offset(unsigned long offset,
55 struct lttng_ust_lib_ring_buffer_channel *chan)
56 {
57 return offset & (chan->backend.subbuf_size - 1);
58 }
59
60 /* subbuf_trunc mask selects the subbuffer number. */
61 static inline
62 unsigned long subbuf_trunc(unsigned long offset,
63 struct lttng_ust_lib_ring_buffer_channel *chan)
64 {
65 return offset & ~(chan->backend.subbuf_size - 1);
66 }
67
68 /* subbuf_align aligns the offset to the next subbuffer. */
69 static inline
70 unsigned long subbuf_align(unsigned long offset,
71 struct lttng_ust_lib_ring_buffer_channel *chan)
72 {
73 return (offset + chan->backend.subbuf_size)
74 & ~(chan->backend.subbuf_size - 1);
75 }
76
77 /* subbuf_index returns the index of the current subbuffer within the buffer. */
78 static inline
79 unsigned long subbuf_index(unsigned long offset,
80 struct lttng_ust_lib_ring_buffer_channel *chan)
81 {
82 return buf_offset(offset, chan) >> chan->backend.subbuf_size_order;
83 }
84
85 /*
86 * Last TSC comparison functions. Check if the current TSC overflows tsc_bits
87 * bits from the last TSC read. When overflows are detected, the full 64-bit
88 * timestamp counter should be written in the record header. Reads and writes
89 * last_tsc atomically.
90 */
91
92 #if (CAA_BITS_PER_LONG == 32)
93 static inline
94 void save_last_tsc(const struct lttng_ust_lib_ring_buffer_config *config,
95 struct lttng_ust_lib_ring_buffer *buf, uint64_t tsc)
96 {
97 if (config->tsc_bits == 0 || config->tsc_bits == 64)
98 return;
99
100 /*
101 * Ensure the compiler performs this update in a single instruction.
102 */
103 v_set(config, &buf->last_tsc, (unsigned long)(tsc >> config->tsc_bits));
104 }
105
106 static inline
107 int last_tsc_overflow(const struct lttng_ust_lib_ring_buffer_config *config,
108 struct lttng_ust_lib_ring_buffer *buf, uint64_t tsc)
109 {
110 unsigned long tsc_shifted;
111
112 if (config->tsc_bits == 0 || config->tsc_bits == 64)
113 return 0;
114
115 tsc_shifted = (unsigned long)(tsc >> config->tsc_bits);
116 if (caa_unlikely(tsc_shifted
117 - (unsigned long)v_read(config, &buf->last_tsc)))
118 return 1;
119 else
120 return 0;
121 }
122 #else
123 static inline
124 void save_last_tsc(const struct lttng_ust_lib_ring_buffer_config *config,
125 struct lttng_ust_lib_ring_buffer *buf, uint64_t tsc)
126 {
127 if (config->tsc_bits == 0 || config->tsc_bits == 64)
128 return;
129
130 v_set(config, &buf->last_tsc, (unsigned long)tsc);
131 }
132
133 static inline
134 int last_tsc_overflow(const struct lttng_ust_lib_ring_buffer_config *config,
135 struct lttng_ust_lib_ring_buffer *buf, uint64_t tsc)
136 {
137 if (config->tsc_bits == 0 || config->tsc_bits == 64)
138 return 0;
139
140 if (caa_unlikely((tsc - v_read(config, &buf->last_tsc))
141 >> config->tsc_bits))
142 return 1;
143 else
144 return 0;
145 }
146 #endif
147
148 __attribute__((visibility("hidden")))
149 extern
150 int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx,
151 void *client_ctx);
152
153 __attribute__((visibility("hidden")))
154 extern
155 void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf,
156 enum switch_mode mode,
157 struct lttng_ust_shm_handle *handle);
158
159 __attribute__((visibility("hidden")))
160 void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_lib_ring_buffer_config *config,
161 struct lttng_ust_lib_ring_buffer *buf,
162 struct lttng_ust_lib_ring_buffer_channel *chan,
163 unsigned long offset,
164 unsigned long commit_count,
165 unsigned long idx,
166 struct lttng_ust_shm_handle *handle,
167 uint64_t tsc);
168
169 /* Buffer write helpers */
170
171 static inline
172 void lib_ring_buffer_reserve_push_reader(struct lttng_ust_lib_ring_buffer *buf,
173 struct lttng_ust_lib_ring_buffer_channel *chan,
174 unsigned long offset)
175 {
176 unsigned long consumed_old, consumed_new;
177
178 do {
179 consumed_old = uatomic_read(&buf->consumed);
180 /*
181 * If buffer is in overwrite mode, push the reader consumed
182 * count if the write position has reached it and we are not
183 * at the first iteration (don't push the reader farther than
184 * the writer). This operation can be done concurrently by many
185 * writers in the same buffer, the writer being at the farthest
186 * write position sub-buffer index in the buffer being the one
187 * which will win this loop.
188 */
189 if (caa_unlikely(subbuf_trunc(offset, chan)
190 - subbuf_trunc(consumed_old, chan)
191 >= chan->backend.buf_size))
192 consumed_new = subbuf_align(consumed_old, chan);
193 else
194 return;
195 } while (caa_unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old,
196 consumed_new) != consumed_old));
197 }
198
199 /*
200 * Move consumed position to the beginning of subbuffer in which the
201 * write offset is. Should only be used on ring buffers that are not
202 * actively being written into, because clear_reader does not take into
203 * account the commit counters when moving the consumed position, which
204 * can make concurrent trace producers or consumers observe consumed
205 * position further than the write offset, which breaks ring buffer
206 * algorithm guarantees.
207 */
208 static inline
209 void lib_ring_buffer_clear_reader(struct lttng_ust_lib_ring_buffer *buf,
210 struct lttng_ust_shm_handle *handle)
211 {
212 struct lttng_ust_lib_ring_buffer_channel *chan;
213 const struct lttng_ust_lib_ring_buffer_config *config;
214 unsigned long offset, consumed_old, consumed_new;
215
216 chan = shmp(handle, buf->backend.chan);
217 if (!chan)
218 return;
219 config = &chan->backend.config;
220
221 do {
222 offset = v_read(config, &buf->offset);
223 consumed_old = uatomic_read(&buf->consumed);
224 CHAN_WARN_ON(chan, (long) (subbuf_trunc(offset, chan)
225 - subbuf_trunc(consumed_old, chan))
226 < 0);
227 consumed_new = subbuf_trunc(offset, chan);
228 } while (caa_unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old,
229 consumed_new) != consumed_old));
230 }
231
232 static inline
233 int lib_ring_buffer_pending_data(const struct lttng_ust_lib_ring_buffer_config *config,
234 struct lttng_ust_lib_ring_buffer *buf,
235 struct lttng_ust_lib_ring_buffer_channel *chan)
236 {
237 return !!subbuf_offset(v_read(config, &buf->offset), chan);
238 }
239
240 static inline
241 unsigned long lib_ring_buffer_get_data_size(const struct lttng_ust_lib_ring_buffer_config *config,
242 struct lttng_ust_lib_ring_buffer *buf,
243 unsigned long idx,
244 struct lttng_ust_shm_handle *handle)
245 {
246 return subbuffer_get_data_size(config, &buf->backend, idx, handle);
247 }
248
249 /*
250 * Check if all space reservation in a buffer have been committed. This helps
251 * knowing if an execution context is nested (for per-cpu buffers only).
252 * This is a very specific ftrace use-case, so we keep this as "internal" API.
253 */
254 static inline
255 int lib_ring_buffer_reserve_committed(const struct lttng_ust_lib_ring_buffer_config *config,
256 struct lttng_ust_lib_ring_buffer *buf,
257 struct lttng_ust_lib_ring_buffer_channel *chan,
258 struct lttng_ust_shm_handle *handle)
259 {
260 unsigned long offset, idx, commit_count;
261 struct commit_counters_hot *cc_hot;
262
263 CHAN_WARN_ON(chan, config->alloc != RING_BUFFER_ALLOC_PER_CPU);
264 CHAN_WARN_ON(chan, config->sync != RING_BUFFER_SYNC_PER_CPU);
265
266 /*
267 * Read offset and commit count in a loop so they are both read
268 * atomically wrt interrupts. By deal with interrupt concurrency by
269 * restarting both reads if the offset has been pushed. Note that given
270 * we only have to deal with interrupt concurrency here, an interrupt
271 * modifying the commit count will also modify "offset", so it is safe
272 * to only check for offset modifications.
273 */
274 do {
275 offset = v_read(config, &buf->offset);
276 idx = subbuf_index(offset, chan);
277 cc_hot = shmp_index(handle, buf->commit_hot, idx);
278 if (caa_unlikely(!cc_hot))
279 return 0;
280 commit_count = v_read(config, &cc_hot->cc);
281 } while (offset != v_read(config, &buf->offset));
282
283 return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
284 - (commit_count & chan->commit_count_mask) == 0);
285 }
286
287 /*
288 * Receive end of subbuffer TSC as parameter. It has been read in the
289 * space reservation loop of either reserve or switch, which ensures it
290 * progresses monotonically with event records in the buffer. Therefore,
291 * it ensures that the end timestamp of a subbuffer is <= begin
292 * timestamp of the following subbuffers.
293 */
294 static inline
295 void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config *config,
296 struct lttng_ust_lib_ring_buffer *buf,
297 struct lttng_ust_lib_ring_buffer_channel *chan,
298 unsigned long offset,
299 unsigned long commit_count,
300 unsigned long idx,
301 struct lttng_ust_shm_handle *handle,
302 uint64_t tsc)
303 {
304 unsigned long old_commit_count = commit_count
305 - chan->backend.subbuf_size;
306
307 /* Check if all commits have been done */
308 if (caa_unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
309 - (old_commit_count & chan->commit_count_mask) == 0))
310 lib_ring_buffer_check_deliver_slow(config, buf, chan, offset,
311 commit_count, idx, handle, tsc);
312 }
313
314 /*
315 * lib_ring_buffer_write_commit_counter
316 *
317 * For flight recording. must be called after commit.
318 * This function increments the subbuffer's commit_seq counter each time the
319 * commit count reaches back the reserve offset (modulo subbuffer size). It is
320 * useful for crash dump.
321 */
322 static inline
323 void lib_ring_buffer_write_commit_counter(const struct lttng_ust_lib_ring_buffer_config *config,
324 struct lttng_ust_lib_ring_buffer *buf,
325 struct lttng_ust_lib_ring_buffer_channel *chan,
326 unsigned long buf_offset,
327 unsigned long commit_count,
328 struct lttng_ust_shm_handle *handle,
329 struct commit_counters_hot *cc_hot)
330 {
331 unsigned long commit_seq_old;
332
333 if (config->oops != RING_BUFFER_OOPS_CONSISTENCY)
334 return;
335
336 /*
337 * subbuf_offset includes commit_count_mask. We can simply
338 * compare the offsets within the subbuffer without caring about
339 * buffer full/empty mismatch because offset is never zero here
340 * (subbuffer header and record headers have non-zero length).
341 */
342 if (caa_unlikely(subbuf_offset(buf_offset - commit_count, chan)))
343 return;
344
345 commit_seq_old = v_read(config, &cc_hot->seq);
346 if (caa_likely((long) (commit_seq_old - commit_count) < 0))
347 v_set(config, &cc_hot->seq, commit_count);
348 }
349
350 __attribute__((visibility("hidden")))
351 extern int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
352 struct channel_backend *chanb, int cpu,
353 struct lttng_ust_shm_handle *handle,
354 struct shm_object *shmobj);
355
356 __attribute__((visibility("hidden")))
357 extern void lib_ring_buffer_free(struct lttng_ust_lib_ring_buffer *buf,
358 struct lttng_ust_shm_handle *handle);
359
360 /* Keep track of trap nesting inside ring buffer code */
361 __attribute__((visibility("hidden")))
362 extern DECLARE_URCU_TLS(unsigned int, lib_ring_buffer_nesting);
363
364 #endif /* _LTTNG_RING_BUFFER_FRONTEND_INTERNAL_H */
This page took 0.036794 seconds and 5 git commands to generate.