X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=908539a8663973fde0a8e51384907b4690b0e30d;hb=26bb88df22d15a644fa44ef093bcb39a809922b7;hp=f9bf5c8d7a00f9d5f08216cd99d69e227757eea2;hpb=dddba3983889d49dd1ee7fbb5b44b9530c1778d8;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index f9bf5c8d..908539a8 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -62,6 +62,7 @@ #include #include #include +#include #include #include "smp.h" @@ -73,10 +74,6 @@ #include "tlsfixup.h" #include "../liblttng-ust/compat.h" /* For ENODATA */ -#ifndef max -#define max(a, b) ((a) > (b) ? (a) : (b)) -#endif - /* Print DBG() messages about events lost only every 1048576 hits */ #define DBG_PRINT_NR_LOST (1UL << 20) @@ -87,6 +84,11 @@ #define LTTNG_UST_RING_BUFFER_GET_RETRY 10 #define LTTNG_UST_RING_BUFFER_RETRY_DELAY_MS 10 +/* + * Non-static to ensure the compiler does not optimize away the xor. + */ +uint8_t lttng_crash_magic_xor[] = RB_CRASH_DUMP_ABI_MAGIC_XOR; + /* * Use POSIX SHM: shm_open(3) and shm_unlink(3). * close(2) to close the fd returned by shm_open. @@ -158,10 +160,14 @@ static struct timer_signal_data timer_signal = { void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf, struct lttng_ust_shm_handle *handle) { - struct channel *chan = shmp(handle, buf->backend.chan); - const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; + struct channel *chan; + const struct lttng_ust_lib_ring_buffer_config *config; unsigned int i; + chan = shmp(handle, buf->backend.chan); + if (!chan) + abort(); + config = &chan->backend.config; /* * Reset iterator first. It will put the subbuffer if it currently holds * it. @@ -207,6 +213,95 @@ void channel_reset(struct channel *chan) /* Don't reset reader reference count */ } +static +void init_crash_abi(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_crash_abi *crash_abi, + struct lttng_ust_lib_ring_buffer *buf, + struct channel_backend *chanb, + struct shm_object *shmobj, + struct lttng_ust_shm_handle *handle) +{ + int i; + + for (i = 0; i < RB_CRASH_DUMP_ABI_MAGIC_LEN; i++) + crash_abi->magic[i] = lttng_crash_magic_xor[i] ^ 0xFF; + crash_abi->mmap_length = shmobj->memory_map_size; + crash_abi->endian = RB_CRASH_ENDIAN; + crash_abi->major = RB_CRASH_DUMP_ABI_MAJOR; + crash_abi->minor = RB_CRASH_DUMP_ABI_MINOR; + crash_abi->word_size = sizeof(unsigned long); + crash_abi->layout_type = LTTNG_CRASH_TYPE_UST; + + /* Offset of fields */ + crash_abi->offset.prod_offset = + (uint32_t) ((char *) &buf->offset - (char *) buf); + crash_abi->offset.consumed_offset = + (uint32_t) ((char *) &buf->consumed - (char *) buf); + crash_abi->offset.commit_hot_array = + (uint32_t) ((char *) shmp(handle, buf->commit_hot) - (char *) buf); + crash_abi->offset.commit_hot_seq = + offsetof(struct commit_counters_hot, seq); + crash_abi->offset.buf_wsb_array = + (uint32_t) ((char *) shmp(handle, buf->backend.buf_wsb) - (char *) buf); + crash_abi->offset.buf_wsb_id = + offsetof(struct lttng_ust_lib_ring_buffer_backend_subbuffer, id); + crash_abi->offset.sb_array = + (uint32_t) ((char *) shmp(handle, buf->backend.array) - (char *) buf); + crash_abi->offset.sb_array_shmp_offset = + offsetof(struct lttng_ust_lib_ring_buffer_backend_pages_shmp, + shmp._ref.offset); + crash_abi->offset.sb_backend_p_offset = + offsetof(struct lttng_ust_lib_ring_buffer_backend_pages, + p._ref.offset); + + /* Field length */ + crash_abi->length.prod_offset = sizeof(buf->offset); + crash_abi->length.consumed_offset = sizeof(buf->consumed); + crash_abi->length.commit_hot_seq = + sizeof(((struct commit_counters_hot *) NULL)->seq); + crash_abi->length.buf_wsb_id = + sizeof(((struct lttng_ust_lib_ring_buffer_backend_subbuffer *) NULL)->id); + crash_abi->length.sb_array_shmp_offset = + sizeof(((struct lttng_ust_lib_ring_buffer_backend_pages_shmp *) NULL)->shmp._ref.offset); + crash_abi->length.sb_backend_p_offset = + sizeof(((struct lttng_ust_lib_ring_buffer_backend_pages *) NULL)->p._ref.offset); + + /* Array stride */ + crash_abi->stride.commit_hot_array = + sizeof(struct commit_counters_hot); + crash_abi->stride.buf_wsb_array = + sizeof(struct lttng_ust_lib_ring_buffer_backend_subbuffer); + crash_abi->stride.sb_array = + sizeof(struct lttng_ust_lib_ring_buffer_backend_pages_shmp); + + /* Buffer constants */ + crash_abi->buf_size = chanb->buf_size; + crash_abi->subbuf_size = chanb->subbuf_size; + crash_abi->num_subbuf = chanb->num_subbuf; + crash_abi->mode = (uint32_t) chanb->config.mode; + + if (config->cb.content_size_field) { + size_t offset, length; + + config->cb.content_size_field(config, &offset, &length); + crash_abi->offset.content_size = offset; + crash_abi->length.content_size = length; + } else { + crash_abi->offset.content_size = 0; + crash_abi->length.content_size = 0; + } + if (config->cb.packet_size_field) { + size_t offset, length; + + config->cb.packet_size_field(config, &offset, &length); + crash_abi->offset.packet_size = offset; + crash_abi->length.packet_size = length; + } else { + crash_abi->offset.packet_size = 0; + crash_abi->length.packet_size = 0; + } +} + /* * Must be called under cpu hotplug protection. */ @@ -226,18 +321,12 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, if (buf->backend.allocated) return 0; - ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, - cpu, handle, shmobj); - if (ret) - return ret; - align_shm(shmobj, __alignof__(struct commit_counters_hot)); set_shmp(buf->commit_hot, zalloc_shm(shmobj, sizeof(struct commit_counters_hot) * chan->backend.num_subbuf)); if (!shmp(handle, buf->commit_hot)) { - ret = -ENOMEM; - goto free_chanbuf; + return -ENOMEM; } align_shm(shmobj, __alignof__(struct commit_counters_cold)); @@ -249,6 +338,12 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, goto free_commit; } + ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, + cpu, handle, shmobj); + if (ret) { + goto free_init; + } + /* * Write the subbuffer header for first subbuffer so we know the total * duration of data gathering. @@ -259,12 +354,16 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, tsc = config->cb.ring_buffer_clock_read(shmp(handle, buf->backend.chan)); config->cb.buffer_begin(buf, tsc, 0, handle); v_add(config, subbuf_header_size, &shmp_index(handle, buf->commit_hot, 0)->cc); + v_add(config, subbuf_header_size, &shmp_index(handle, buf->commit_hot, 0)->seq); if (config->cb.buffer_create) { ret = config->cb.buffer_create(buf, priv, cpu, chanb->name, handle); if (ret) - goto free_init; + goto free_chanbuf; } + + init_crash_abi(config, &buf->crash_abi, buf, chanb, shmobj, handle); + buf->backend.allocated = 1; return 0; @@ -301,6 +400,9 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc) for_each_possible_cpu(cpu) { struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); + + if (!buf) + abort(); if (uatomic_read(&buf->active_readers)) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, chan->handle); @@ -309,6 +411,8 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc) struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); + if (!buf) + abort(); if (uatomic_read(&buf->active_readers)) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, chan->handle); @@ -336,6 +440,8 @@ void lib_ring_buffer_channel_do_read(struct channel *chan) struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); + if (!buf) + abort(); if (uatomic_read(&buf->active_readers) && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { @@ -346,6 +452,8 @@ void lib_ring_buffer_channel_do_read(struct channel *chan) struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); + if (!buf) + abort(); if (uatomic_read(&buf->active_readers) && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { @@ -530,7 +638,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) } its.it_value.tv_sec = chan->switch_timer_interval / 1000000; - its.it_value.tv_nsec = chan->switch_timer_interval % 1000000; + its.it_value.tv_nsec = (chan->switch_timer_interval % 1000000) * 1000; its.it_interval.tv_sec = its.it_value.tv_sec; its.it_interval.tv_nsec = its.it_value.tv_nsec; @@ -584,7 +692,7 @@ void lib_ring_buffer_channel_read_timer_start(struct channel *chan) } its.it_value.tv_sec = chan->read_timer_interval / 1000000; - its.it_value.tv_nsec = chan->read_timer_interval % 1000000; + its.it_value.tv_nsec = (chan->read_timer_interval % 1000000) * 1000; its.it_interval.tv_sec = its.it_value.tv_sec; its.it_interval.tv_nsec = its.it_value.tv_nsec; @@ -674,6 +782,8 @@ static void channel_free(struct channel *chan, * padding to let readers get those sub-buffers. * Used for live streaming. * @read_timer_interval: Time interval (in us) to wake up pending readers. + * @stream_fds: array of stream file descriptors. + * @nr_stream_fds: number of file descriptors in array. * * Holds cpu hotplug. * Returns NULL on failure. @@ -686,7 +796,8 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff void *priv_data_init, void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, - unsigned int read_timer_interval) + unsigned int read_timer_interval, + const int *stream_fds, int nr_stream_fds) { int ret; size_t shmsize, chansize; @@ -700,6 +811,9 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff else nr_streams = 1; + if (nr_stream_fds != nr_streams) + return NULL; + if (lib_ring_buffer_check_config(config, switch_timer_interval, read_timer_interval)) return NULL; @@ -723,7 +837,8 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff shmsize += priv_data_size; /* Allocate normal memory for channel (not shared) */ - shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM); + shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM, + -1); if (!shmobj) goto error_append; /* struct channel is at object 0, offset 0 (hardcoded) */ @@ -753,7 +868,8 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff } ret = channel_backend_init(&chan->backend, name, config, - subbuf_size, num_subbuf, handle); + subbuf_size, num_subbuf, handle, + stream_fds); if (ret) goto error_backend_init; @@ -1372,7 +1488,8 @@ void lib_ring_buffer_print_errors(struct channel *chan, /* * lib_ring_buffer_switch_old_start: Populate old subbuffer header. * - * Only executed when the buffer is finalized, in SWITCH_FLUSH. + * Only executed by SWITCH_FLUSH, which can be issued while tracing is + * active or at buffer finalization (destroy). */ static void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf, @@ -1546,12 +1663,14 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, unsigned long sb_index, commit_count; /* - * We are performing a SWITCH_FLUSH. At this stage, there are no - * concurrent writes into the buffer. + * We are performing a SWITCH_FLUSH. There may be concurrent + * writes into the buffer if e.g. invoked while performing a + * snapshot on an active trace. * - * The client does not save any header information. Don't - * switch empty subbuffer on finalize, because it is invalid to - * deliver a completely empty subbuffer. + * If the client does not save any header information + * (sub-buffer header size == 0), don't switch empty subbuffer + * on finalize, because it is invalid to deliver a completely + * empty subbuffer. */ if (!config->cb.subbuffer_header_size()) return -1;