X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=db95179dc8f6f581efac87918d519728a05dec8c;hb=3ef395a9e7ef9adfc3cbb7d93ab8618bcc373f70;hp=b64ad04aaa3cd101b3275181b5b382a40fb62385;hpb=641e88732ae04f89c397ef4400babd5a92c7397f;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index b64ad04aa..db95179dc 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -17,6 +17,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#include #define _LGPL_SOURCE #include #include @@ -120,26 +121,6 @@ error: return ret; } -/* - * Allocate and return a consumer channel object. - */ -static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, - const uint64_t *chunk_id, const char *pathname, const char *name, - uint64_t relayd_id, uint64_t key, enum lttng_event_output output, - uint64_t tracefile_size, uint64_t tracefile_count, - uint64_t session_id_per_pid, unsigned int monitor, - unsigned int live_timer_interval, - const char *root_shm_path, const char *shm_path) -{ - assert(pathname); - assert(name); - - return consumer_allocate_channel(key, session_id, chunk_id, pathname, - name, relayd_id, output, tracefile_size, - tracefile_count, session_id_per_pid, monitor, - live_timer_interval, root_shm_path, shm_path); -} - /* * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the * error value if applicable is set in it else it is kept untouched. @@ -156,7 +137,9 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, assert(channel); assert(ctx); - stream = consumer_allocate_stream(channel->key, + stream = consumer_allocate_stream( + channel, + channel->key, key, channel->name, channel->relayd_id, @@ -185,7 +168,6 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, } consumer_stream_update_channel_attributes(stream, channel); - stream->chan = channel; error: if (_alloc_ret) { @@ -1079,7 +1061,6 @@ error_stream: * Clean up the stream completly because the next snapshot will use a new * metadata stream. */ - pthread_mutex_lock(&metadata_stream->lock); consumer_stream_destroy(metadata_stream, NULL); cds_list_del(&metadata_stream->send_node); metadata_channel->metadata_stream = NULL; @@ -1089,6 +1070,35 @@ error: return ret; } +static +int get_current_subbuf_addr(struct lttng_consumer_stream *stream, + const char **addr) +{ + int ret; + unsigned long mmap_offset; + const char *mmap_base; + + mmap_base = ustctl_get_mmap_base(stream->ustream); + if (!mmap_base) { + ERR("Failed to get mmap base for stream `%s`", + stream->name); + ret = -EPERM; + goto error; + } + + ret = ustctl_get_mmap_read_offset(stream->ustream, &mmap_offset); + if (ret != 0) { + ERR("Failed to get mmap offset for stream `%s`", stream->name); + ret = -EINVAL; + goto error; + } + + *addr = mmap_base + mmap_offset; +error: + return ret; + +} + /* * Take a snapshot of all the stream of a channel. * RCU read-side lock and the channel lock must be held by the caller. @@ -1191,6 +1201,8 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, while ((long) (consumed_pos - produced_pos) < 0) { ssize_t read_len; unsigned long len, padded_len; + const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; health_code_update(); @@ -1220,8 +1232,16 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, goto error_put_subbuf; } - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, - padded_len - len, NULL); + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + + subbuf_view = lttng_buffer_view_init( + subbuf_addr, 0, padded_len); + read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + stream, &subbuf_view, padded_len - len, + NULL); if (use_relayd) { if (read_len != len) { ret = -EPERM; @@ -1450,19 +1470,21 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, }; /* Create a plain object and reserve a channel key. */ - channel = allocate_channel(msg.u.ask_channel.session_id, + channel = consumer_allocate_channel( + msg.u.ask_channel.key, + msg.u.ask_channel.session_id, msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL, msg.u.ask_channel.pathname, msg.u.ask_channel.name, msg.u.ask_channel.relayd_id, - msg.u.ask_channel.key, (enum lttng_event_output) msg.u.ask_channel.output, msg.u.ask_channel.tracefile_size, msg.u.ask_channel.tracefile_count, msg.u.ask_channel.session_id_per_pid, msg.u.ask_channel.monitor, msg.u.ask_channel.live_timer_interval, + msg.u.ask_channel.is_live, msg.u.ask_channel.root_shm_path, msg.u.ask_channel.shm_path); if (!channel) { @@ -2171,7 +2193,6 @@ end_msg_sessiond: end_channel_error: if (channel) { - pthread_mutex_unlock(&channel->lock); /* * Free channel here since no one has a reference to it. We don't * free after that because a stream can store this pointer. @@ -2198,31 +2219,6 @@ end: return ret; } -/* - * Wrapper over the mmap() read offset from ust-ctl library. Since this can be - * compiled out, we isolate it in this library. - */ -int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream, - unsigned long *off) -{ - assert(stream); - assert(stream->ustream); - - return ustctl_get_mmap_read_offset(stream->ustream, off); -} - -/* - * Wrapper over the mmap() read offset from ust-ctl library. Since this can be - * compiled out, we isolate it in this library. - */ -void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream) -{ - assert(stream); - assert(stream->ustream); - - return ustctl_get_mmap_base(stream->ustream); -} - void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) { @@ -2584,37 +2580,59 @@ end: * interacting with sessiond, else we cause a deadlock with live * awaiting on metadata to be pushed out. * + * The RCU read side lock must be held by the caller. + * * Return 0 if new metadatda is available, EAGAIN if the metadata stream * is empty or a negative value on error. */ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *metadata) + struct lttng_consumer_stream *metadata_stream) { int ret; int retry = 0; + struct lttng_consumer_channel *metadata_channel; assert(ctx); - assert(metadata); + assert(metadata_stream); - pthread_mutex_unlock(&metadata->lock); + metadata_channel = metadata_stream->chan; + pthread_mutex_unlock(&metadata_stream->lock); /* * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0); - pthread_mutex_lock(&metadata->lock); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); + pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { goto end; } - ret = commit_one_metadata_packet(metadata); + /* + * The metadata stream and channel can be deleted while the + * metadata stream lock was released. The streamed is checked + * for deletion before we use it further. + * + * Note that it is safe to access a logically-deleted stream since its + * existence is still guaranteed by the RCU read side lock. However, + * it should no longer be used. The close/deletion of the metadata + * channel and stream already guarantees that all metadata has been + * consumed. Therefore, there is nothing left to do in this function. + */ + if (consumer_stream_is_deleted(metadata_stream)) { + DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization", + metadata_stream->key); + ret = 0; + goto end; + } + + ret = commit_one_metadata_packet(metadata_stream); if (ret <= 0) { goto end; } else if (ret > 0) { retry = 1; } - ret = ustctl_snapshot(metadata->ustream); + ret = ustctl_snapshot(metadata_stream->ustream); if (ret < 0) { if (errno != EAGAIN) { ERR("Sync metadata, taking UST snapshot"); @@ -2758,6 +2776,8 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; + const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; assert(stream); assert(stream->ustream); @@ -2867,11 +2887,21 @@ retry: padding = len - subbuf_size; + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + write_index = 0; + goto error_put_subbuf; + } + + subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len); + /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index); + ret = lttng_consumer_on_read_subbuffer_mmap( + ctx, stream, &subbuf_view, padding, &index); /* - * The mmap operation should write subbuf_size amount of data when network - * streaming or the full padding (len) size when we are _not_ streaming. + * The mmap operation should write subbuf_size amount of data when + * network streaming or the full padding (len) size when we are _not_ + * streaming. */ if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { @@ -2888,6 +2918,7 @@ retry: ret, len, subbuf_size); write_index = 0; } +error_put_subbuf: err = ustctl_put_next_subbuf(ustream); assert(err == 0);