X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=0d0f26865da9fd4da5aa140c5afcb702c4e89f08;hb=16adecf1f2e80025667ed53f4905e725894f076a;hp=07724b71ceb8939b0dfaac3872dc0483de6fb451;hpb=15500a1bb134d6bbd409da5568308c2a41291928;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 07724b71..0d0f2686 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -60,6 +60,7 @@ #include #include #include +#include #include #include #include @@ -72,7 +73,7 @@ #include "backend.h" #include "frontend.h" #include "shm.h" -#include "tlsfixup.h" +#include "rb-init.h" #include "../liblttng-ust/compat.h" /* For ENODATA */ /* Print DBG() messages about events lost only every 1048576 hits */ @@ -84,6 +85,7 @@ #define CLOCKID CLOCK_MONOTONIC #define LTTNG_UST_RING_BUFFER_GET_RETRY 10 #define LTTNG_UST_RING_BUFFER_RETRY_DELAY_MS 10 +#define RETRY_DELAY_MS 100 /* 100 ms. */ /* * Non-static to ensure the compiler does not optimize away the xor. @@ -149,6 +151,21 @@ static struct timer_signal_data timer_signal = { .lock = PTHREAD_MUTEX_INITIALIZER, }; +static bool lttng_ust_allow_blocking; + +void lttng_ust_ringbuffer_set_allow_blocking(void) +{ + lttng_ust_allow_blocking = true; +} + +/* Get blocking timeout, in ms */ +static int lttng_ust_ringbuffer_get_timeout(struct channel *chan) +{ + if (!lttng_ust_allow_blocking) + return 0; + return chan->u.s.blocking_timeout_ms; +} + /** * lib_ring_buffer_reset - Reset ring buffer to initial values. * @buf: Ring buffer. @@ -901,11 +918,12 @@ static void channel_print_errors(struct channel *chan, } static void channel_free(struct channel *chan, - struct lttng_ust_shm_handle *handle) + struct lttng_ust_shm_handle *handle, + int consumer) { channel_backend_free(&chan->backend, handle); /* chan is freed by shm teardown */ - shm_object_table_destroy(handle->table); + shm_object_table_destroy(handle->table, consumer); free(handle); } @@ -940,7 +958,8 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval, - const int *stream_fds, int nr_stream_fds) + const int *stream_fds, int nr_stream_fds, + int64_t blocking_timeout) { int ret; size_t shmsize, chansize; @@ -948,6 +967,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff struct lttng_ust_shm_handle *handle; struct shm_object *shmobj; unsigned int nr_streams; + int64_t blocking_timeout_ms; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) nr_streams = num_possible_cpus(); @@ -957,6 +977,19 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff if (nr_stream_fds != nr_streams) return NULL; + if (blocking_timeout < -1) { + return NULL; + } + /* usec to msec */ + if (blocking_timeout == -1) { + blocking_timeout_ms = -1; + } else { + blocking_timeout_ms = blocking_timeout / 1000; + if (blocking_timeout_ms != (int32_t) blocking_timeout_ms) { + return NULL; + } + } + if (lib_ring_buffer_check_config(config, switch_timer_interval, read_timer_interval)) return NULL; @@ -981,7 +1014,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff /* Allocate normal memory for channel (not shared) */ shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM, - -1); + -1, -1); if (!shmobj) goto error_append; /* struct channel is at object 0, offset 0 (hardcoded) */ @@ -1010,6 +1043,8 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff *priv_data = NULL; } + chan->u.s.blocking_timeout_ms = (int32_t) blocking_timeout_ms; + ret = channel_backend_init(&chan->backend, name, config, subbuf_size, num_subbuf, handle, stream_fds); @@ -1028,7 +1063,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff error_backend_init: error_append: - shm_object_table_destroy(handle->table); + shm_object_table_destroy(handle->table, 1); error_table_alloc: free(handle); return NULL; @@ -1060,7 +1095,7 @@ struct lttng_ust_shm_handle *channel_handle_create(void *data, return handle; error_table_object: - shm_object_table_destroy(handle->table); + shm_object_table_destroy(handle->table, 0); error_table_alloc: free(handle); return NULL; @@ -1088,9 +1123,10 @@ unsigned int channel_handle_get_nr_streams(struct lttng_ust_shm_handle *handle) } static -void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle) +void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle, + int consumer) { - channel_free(chan, handle); + channel_free(chan, handle, consumer); } /** @@ -1122,7 +1158,7 @@ void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle, * sessiond/consumer are keeping a reference on the shm file * descriptor directly. No need to refcount. */ - channel_release(chan, handle); + channel_release(chan, handle, consumer); return; } @@ -1293,6 +1329,43 @@ nodata: return -EAGAIN; } +/** + * Performs the same function as lib_ring_buffer_snapshot(), but the positions + * are saved regardless of whether the consumed and produced positions are + * in the same subbuffer. + * @buf: ring buffer + * @consumed: consumed byte count indicating the last position read + * @produced: produced byte count indicating the last position written + * + * This function is meant to provide information on the exact producer and + * consumer positions without regard for the "snapshot" feature. + */ +int lib_ring_buffer_snapshot_sample_positions( + struct lttng_ust_lib_ring_buffer *buf, + unsigned long *consumed, unsigned long *produced, + struct lttng_ust_shm_handle *handle) +{ + struct channel *chan; + const struct lttng_ust_lib_ring_buffer_config *config; + + chan = shmp(handle, buf->backend.chan); + if (!chan) + return -EPERM; + config = &chan->backend.config; + cmm_smp_rmb(); + *consumed = uatomic_read(&buf->consumed); + /* + * No need to issue a memory barrier between consumed count read and + * write offset read, because consumed count can only change + * concurrently in overwrite mode, and we keep a sequence counter + * identifier derived from the write offset to check we are getting + * the same sub-buffer we are expecting (the sub-buffers are atomically + * "tagged" upon writes, tags are checked upon read). + */ + *produced = v_read(config, &buf->offset); + return 0; +} + /** * lib_ring_buffer_move_consumer - move consumed counter forward * @buf: ring buffer @@ -1983,6 +2056,23 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum swi lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle); } +static +bool handle_blocking_retry(int *timeout_left_ms) +{ + int timeout = *timeout_left_ms, delay; + + if (caa_likely(!timeout)) + return false; /* Do not retry, discard event. */ + if (timeout < 0) /* Wait forever. */ + delay = RETRY_DELAY_MS; + else + delay = min_t(int, timeout, RETRY_DELAY_MS); + (void) poll(NULL, 0, delay); + if (timeout > 0) + *timeout_left_ms -= delay; + return true; /* Retry. */ +} + /* * Returns : * 0 if ok @@ -1994,11 +2084,13 @@ static int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - struct lttng_ust_lib_ring_buffer_ctx *ctx) + struct lttng_ust_lib_ring_buffer_ctx *ctx, + void *client_ctx) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; struct lttng_ust_shm_handle *handle = ctx->handle; unsigned long reserve_commit_diff, offset_cmp; + int timeout_left_ms = lttng_ust_ringbuffer_get_timeout(chan); retry: offsets->begin = offset_cmp = v_read(config, &buf->offset); @@ -2021,7 +2113,7 @@ retry: offsets->size = config->cb.record_header_size(config, chan, offsets->begin, &offsets->pre_header_padding, - ctx); + ctx, client_ctx); offsets->size += lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align) @@ -2081,6 +2173,9 @@ retry: >= chan->backend.buf_size)) { unsigned long nr_lost; + if (handle_blocking_retry(&timeout_left_ms)) + goto retry; + /* * We do not overwrite non consumed buffers * and we are full : record is lost. @@ -2124,7 +2219,7 @@ retry: config->cb.record_header_size(config, chan, offsets->begin, &offsets->pre_header_padding, - ctx); + ctx, client_ctx); offsets->size += lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align) @@ -2178,7 +2273,8 @@ retry: * -EIO for other errors, else returns 0. * It will take care of sub-buffer switching. */ -int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx) +int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx, + void *client_ctx) { struct channel *chan = ctx->chan; struct lttng_ust_shm_handle *handle = ctx->handle; @@ -2199,7 +2295,7 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx) do { ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets, - ctx); + ctx, client_ctx); if (caa_unlikely(ret)) return ret; } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,