From: Mathieu Desnoyers Date: Fri, 23 Sep 2011 15:39:44 +0000 (-0400) Subject: Update lib ring buffer for external consumer X-Git-Url: http://git.liburcu.org/?p=ust.git;a=commitdiff_plain;h=824f40b81426c6ac82685251018dae00947786a9 Update lib ring buffer for external consumer Signed-off-by: Mathieu Desnoyers --- diff --git a/include/Makefile.am b/include/Makefile.am index dcec467..df20225 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -10,9 +10,10 @@ nobase_include_HEADERS = \ ust/ringbuffer-abi.h \ ust/lttng-tracer.h \ ust/usterr-signal-safe.h \ - ust/core.h \ + ust/config.h \ ust/share.h \ - ust/ust.h + ust/ust.h \ + ust/core.h # note: usterr-signal-safe.h, core.h and share.h need namespace cleanup. @@ -21,15 +22,15 @@ noinst_HEADERS = \ usterr.h \ ust_snprintf.h \ ust/compat.h \ - ust/config.h \ ust/marker-internal.h \ ust/tracepoint-internal.h \ ust/clock.h \ ust/probe-internal.h \ - ust/processor.h \ ust/kcompat/kcompat.h \ ust/kcompat/jhash.h \ ust/kcompat/compiler.h \ ust/kcompat/types.h \ ust/stringify.h \ - ust/wait.h + ust/wait.h \ + ust/ringbuffer-config.h \ + ust/processor.h diff --git a/libringbuffer/frontend.h b/libringbuffer/frontend.h index a88a400..9fa0f04 100644 --- a/libringbuffer/frontend.h +++ b/libringbuffer/frontend.h @@ -62,7 +62,8 @@ int channel_handle_add_stream(struct shm_handle *handle, * channel. */ extern -void *channel_destroy(struct channel *chan, struct shm_handle *handle); +void *channel_destroy(struct channel *chan, struct shm_handle *handle, + int shadow); /* Buffer read operations */ @@ -83,9 +84,11 @@ extern struct lib_ring_buffer *channel_get_ring_buffer( int *shm_fd, int *wait_fd, uint64_t *memory_map_size); extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf, - struct shm_handle *handle); + struct shm_handle *handle, + int shadow); extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf, - struct shm_handle *handle); + struct shm_handle *handle, + int shadow); /* * Read sequence: snapshot, many get_subbuf/put_subbuf, move_consumer. diff --git a/libringbuffer/frontend_internal.h b/libringbuffer/frontend_internal.h index 1fb08df..a6b96c1 100644 --- a/libringbuffer/frontend_internal.h +++ b/libringbuffer/frontend_internal.h @@ -376,7 +376,8 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free. */ if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER - && uatomic_read(&buf->active_readers) + && (uatomic_read(&buf->active_readers) + || uatomic_read(&buf->active_shadow_readers)) && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref); diff --git a/libringbuffer/frontend_types.h b/libringbuffer/frontend_types.h index ad7848c..76ad7ff 100644 --- a/libringbuffer/frontend_types.h +++ b/libringbuffer/frontend_types.h @@ -87,6 +87,7 @@ struct lib_ring_buffer { * Active readers count * standard atomic access (shared) */ + long active_shadow_readers; /* Dropped records */ union v_atomic records_lost_full; /* Buffer full */ union v_atomic records_lost_wrap; /* Nested wrap-around */ diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 5d87782..5e6d4df 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -249,7 +249,7 @@ static void switch_buffer_timer(unsigned long data) /* * Only flush buffers periodically if readers are active. */ - if (uatomic_read(&buf->active_readers)) + if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers)) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle); //TODO timers @@ -307,7 +307,7 @@ static void read_buffer_timer(unsigned long data) CHAN_WARN_ON(chan, !buf->backend.allocated); - if (uatomic_read(&buf->active_readers) + if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers)) && lib_ring_buffer_poll_deliver(config, buf, chan)) { //TODO //wake_up_interruptible(&buf->read_wait); @@ -395,11 +395,13 @@ static void channel_unregister_notifiers(struct channel *chan, //channel_backend_unregister_notifiers(&chan->backend); } -static void channel_free(struct channel *chan, struct shm_handle *handle) +static void channel_free(struct channel *chan, struct shm_handle *handle, + int shadow) { int ret; - channel_backend_free(&chan->backend, handle); + if (!shadow) + channel_backend_free(&chan->backend, handle); /* chan is freed by shm teardown */ shm_object_table_destroy(handle->table); free(handle); @@ -550,9 +552,10 @@ int channel_handle_add_stream(struct shm_handle *handle, } static -void channel_release(struct channel *chan, struct shm_handle *handle) +void channel_release(struct channel *chan, struct shm_handle *handle, + int shadow) { - channel_free(chan, handle); + channel_free(chan, handle, shadow); } /** @@ -566,12 +569,18 @@ void channel_release(struct channel *chan, struct shm_handle *handle) * They should release their handle at that point. Returns the private * data pointer. */ -void *channel_destroy(struct channel *chan, struct shm_handle *handle) +void *channel_destroy(struct channel *chan, struct shm_handle *handle, + int shadow) { const struct lib_ring_buffer_config *config = &chan->backend.config; void *priv; int cpu; + if (shadow) { + channel_release(chan, handle, shadow); + return NULL; + } + channel_unregister_notifiers(chan, handle); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { @@ -615,7 +624,7 @@ void *channel_destroy(struct channel *chan, struct shm_handle *handle) * descriptor directly. No need to refcount. */ priv = chan->backend.priv; - channel_release(chan, handle); + channel_release(chan, handle, shadow); return priv; } @@ -642,10 +651,17 @@ struct lib_ring_buffer *channel_get_ring_buffer( } int lib_ring_buffer_open_read(struct lib_ring_buffer *buf, - struct shm_handle *handle) + struct shm_handle *handle, + int shadow) { struct channel *chan = shmp(handle, buf->backend.chan); + if (shadow) { + if (uatomic_cmpxchg(&buf->active_shadow_readers, 0, 1) != 0) + return -EBUSY; + cmm_smp_mb(); + return 0; + } if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0) return -EBUSY; cmm_smp_mb(); @@ -653,10 +669,17 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf, } void lib_ring_buffer_release_read(struct lib_ring_buffer *buf, - struct shm_handle *handle) + struct shm_handle *handle, + int shadow) { struct channel *chan = shmp(handle, buf->backend.chan); + if (shadow) { + CHAN_WARN_ON(chan, uatomic_read(&buf->active_shadow_readers) != 1); + cmm_smp_mb(); + uatomic_dec(&buf->active_shadow_readers); + return; + } CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); cmm_smp_mb(); uatomic_dec(&buf->active_readers); @@ -734,7 +757,8 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, struct channel *chan = shmp(handle, bufb->chan); unsigned long consumed; - CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); + CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1 + && uatomic_read(&buf->active_shadow_readers) != 1); /* * Only push the consumed value forward. @@ -857,7 +881,8 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf, const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long read_sb_bindex, consumed_idx, consumed; - CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); + CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1 + && uatomic_read(&buf->active_shadow_readers) != 1); if (!buf->get_subbuf) { /* diff --git a/libringbuffer/shm.c b/libringbuffer/shm.c index ae63b00..fd0919f 100644 --- a/libringbuffer/shm.c +++ b/libringbuffer/shm.c @@ -214,6 +214,8 @@ void shmp_object_destroy(struct shm_object *obj) assert(0); } for (i = 0; i < 2; i++) { + if (obj->wait_fd[i] < 0) + continue; ret = close(obj->wait_fd[i]); if (ret) { PERROR("close"); diff --git a/libust/ltt-ring-buffer-client.h b/libust/ltt-ring-buffer-client.h index f3a1a27..5f5c315 100644 --- a/libust/ltt-ring-buffer-client.h +++ b/libust/ltt-ring-buffer-client.h @@ -397,7 +397,7 @@ struct ltt_channel *_channel_create(const char *name, static void ltt_channel_destroy(struct ltt_channel *ltt_chan) { - channel_destroy(ltt_chan->chan, ltt_chan->handle); + channel_destroy(ltt_chan->chan, ltt_chan->handle, 0); } static @@ -413,7 +413,7 @@ struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan, buf = channel_get_ring_buffer(&client_config, chan, cpu, handle, shm_fd, wait_fd, memory_map_size); - if (!lib_ring_buffer_open_read(buf, handle)) + if (!lib_ring_buffer_open_read(buf, handle, 0)) return buf; } return NULL; @@ -423,7 +423,7 @@ static void ltt_buffer_read_close(struct lib_ring_buffer *buf, struct shm_handle *handle) { - lib_ring_buffer_release_read(buf, handle); + lib_ring_buffer_release_read(buf, handle, 0); } static diff --git a/libust/ltt-ring-buffer-metadata-client.h b/libust/ltt-ring-buffer-metadata-client.h index fa79485..0102860 100644 --- a/libust/ltt-ring-buffer-metadata-client.h +++ b/libust/ltt-ring-buffer-metadata-client.h @@ -177,7 +177,7 @@ struct ltt_channel *_channel_create(const char *name, static void ltt_channel_destroy(struct ltt_channel *ltt_chan) { - channel_destroy(ltt_chan->chan, ltt_chan->handle); + channel_destroy(ltt_chan->chan, ltt_chan->handle, 0); } static @@ -190,7 +190,7 @@ struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan, buf = channel_get_ring_buffer(&client_config, chan, 0, handle, shm_fd, wait_fd, memory_map_size); - if (!lib_ring_buffer_open_read(buf, handle)) + if (!lib_ring_buffer_open_read(buf, handle, 0)) return buf; return NULL; } @@ -199,7 +199,7 @@ static void ltt_buffer_read_close(struct lib_ring_buffer *buf, struct shm_handle *handle) { - lib_ring_buffer_release_read(buf, handle); + lib_ring_buffer_release_read(buf, handle, 0); } static diff --git a/libust/lttng-ust-abi.c b/libust/lttng-ust-abi.c index cea8c30..16566c8 100644 --- a/libust/lttng-ust-abi.c +++ b/libust/lttng-ust-abi.c @@ -179,13 +179,8 @@ void objd_table_destroy(void) for (i = 0; i < objd_table.allocated_len; i++) { struct obj *obj = _objd_get(i); - const struct objd_ops *ops; - if (!obj) - continue; - ops = obj->u.s.ops; - if (ops->release) - ops->release(i); + (void) objd_unref(i); } free(objd_table.array); objd_table.array = NULL; @@ -754,6 +749,7 @@ int lttng_rb_release(int objd) buf = priv->buf; channel = priv->ltt_chan; free(priv); + channel->ops->buffer_read_close(buf, channel->handle); return objd_unref(channel->objd); } diff --git a/libust/lttng-ust-comm.c b/libust/lttng-ust-comm.c index 2e7e1a3..858e92c 100644 --- a/libust/lttng-ust-comm.c +++ b/libust/lttng-ust-comm.c @@ -294,6 +294,7 @@ end: shm_fd = lum->u.stream.shm_fd; wait_fd = lum->u.stream.wait_fd; break; + case LTTNG_UST_METADATA: case LTTNG_UST_CHANNEL: lur.u.channel.memory_map_size = lum->u.channel.memory_map_size; shm_fd = lum->u.channel.shm_fd; @@ -306,7 +307,9 @@ end: goto error; } - if ((lum->cmd == LTTNG_UST_STREAM || lum->cmd == LTTNG_UST_CHANNEL) + if ((lum->cmd == LTTNG_UST_STREAM + || lum->cmd == LTTNG_UST_CHANNEL + || lum->cmd == LTTNG_UST_METADATA) && lur.ret_code == LTTCOMM_OK) { /* we also need to send the file descriptors. */ ret = lttcomm_send_fds_unix_sock(sock,