*
* This function MUST be called with the consumer_data lock acquired.
*/
-void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret;
struct lttng_ht_iter iter;
/* Destroy the relayd if refcount is 0 */
if (uatomic_read(&relayd->refcount) == 0) {
- consumer_destroy_relayd(relayd);
+ destroy_relayd(relayd);
}
}
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
uatomic_read(&relayd->destroy_flag)) {
- consumer_destroy_relayd(relayd);
+ destroy_relayd(relayd);
}
}
rcu_read_unlock();
- if (!--stream->chan->refcount) {
+ uatomic_dec(&stream->chan->refcount);
+ if (!uatomic_read(&stream->chan->refcount)
+ && !uatomic_read(&stream->chan->nb_init_streams)) {
free_chan = stream->chan;
}
-
call_rcu(&stream->node.head, consumer_free_stream);
end:
consumer_data.need_update = 1;
pthread_mutex_unlock(&consumer_data.lock);
- if (free_chan)
+ if (free_chan) {
consumer_del_channel(free_chan);
+ }
}
struct lttng_consumer_stream *consumer_allocate_stream(
uid_t uid,
gid_t gid,
int net_index,
- int metadata_flag)
+ int metadata_flag,
+ int *alloc_ret)
{
struct lttng_consumer_stream *stream;
int ret;
stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
perror("malloc struct lttng_consumer_stream");
- goto end;
+ *alloc_ret = -ENOMEM;
+ return NULL;
}
stream->chan = consumer_find_channel(channel_key);
if (!stream->chan) {
- perror("Unable to find channel key");
- goto end;
+ *alloc_ret = -ENOENT;
+ goto error;
}
stream->chan->refcount++;
stream->key = stream_key;
stream->cpu = stream->chan->cpucount++;
ret = lttng_ustconsumer_allocate_stream(stream);
if (ret) {
- free(stream);
- return NULL;
+ *alloc_ret = -EINVAL;
+ goto error;
}
break;
default:
ERR("Unknown consumer_data type");
- assert(0);
- goto end;
+ *alloc_ret = -EINVAL;
+ goto error;
}
- DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
- stream->path_name, stream->key,
- stream->shm_fd,
- stream->wait_fd,
- (unsigned long long) stream->mmap_len,
- stream->out_fd,
+
+ /*
+ * When nb_init_streams reaches 0, we don't need to trigger any action in
+ * terms of destroying the associated channel, because the action that
+ * causes the count to become 0 also causes a stream to be added. The
+ * channel deletion will thus be triggered by the following removal of this
+ * stream.
+ */
+ if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
+ uatomic_dec(&stream->chan->nb_init_streams);
+ }
+
+ DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
+ " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
+ stream->shm_fd, stream->wait_fd,
+ (unsigned long long) stream->mmap_len, stream->out_fd,
stream->net_seq_idx);
-end:
return stream;
+
+error:
+ free(stream);
+ return NULL;
}
/*
* Add relayd socket to global consumer data hashtable. RCU read side lock MUST
* be acquired before calling this.
*/
-
-int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
+static int add_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret = 0;
struct lttng_ht_node_ulong *node;
int channel_key,
int shm_fd, int wait_fd,
uint64_t mmap_len,
- uint64_t max_sb_size)
+ uint64_t max_sb_size,
+ unsigned int nb_init_streams)
{
struct lttng_consumer_channel *channel;
int ret;
channel->mmap_len = mmap_len;
channel->max_sb_size = max_sb_size;
channel->refcount = 0;
+ channel->nb_init_streams = nb_init_streams;
lttng_ht_node_init_ulong(&channel->node, channel->key);
switch (consumer_data.type) {
return;
}
+ rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
ret = lttng_ht_del(ht, &iter);
assert(!ret);
free(stream);
}
+ rcu_read_unlock();
lttng_ht_destroy(ht);
}
static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
{
int ret;
- struct lttng_consumer_channel *free_chan = NULL;
struct consumer_relayd_sock_pair *relayd;
assert(stream);
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
uatomic_read(&relayd->destroy_flag)) {
- consumer_destroy_relayd(relayd);
+ destroy_relayd(relayd);
}
}
rcu_read_unlock();
/* Atomically decrement channel refcount since other threads can use it. */
uatomic_dec(&stream->chan->refcount);
- if (!uatomic_read(&stream->chan->refcount)) {
- free_chan = stream->chan;
- }
-
- if (free_chan) {
- consumer_del_channel(free_chan);
+ if (!uatomic_read(&stream->chan->refcount)
+ && !uatomic_read(&stream->chan->nb_init_streams)) {
+ /* Go for channel deletion! */
+ consumer_del_channel(stream->chan);
}
free(stream);
/* Check the metadata pipe for incoming metadata. */
if (pollfd == ctx->consumer_metadata_pipe[0]) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) {
+ if (revents & (LPOLLERR | LPOLLHUP )) {
DBG("Metadata thread pipe hung up");
/*
* Remove the pipe from the poll set and continue the loop
close(ctx->consumer_metadata_pipe[0]);
continue;
} else if (revents & LPOLLIN) {
- stream = zmalloc(sizeof(struct lttng_consumer_stream));
- if (stream == NULL) {
- PERROR("zmalloc metadata consumer stream");
- goto error;
- }
-
do {
- /* Get the stream and add it to the local hash table */
- ret = read(pollfd, stream,
- sizeof(struct lttng_consumer_stream));
+ /* Get the stream pointer received */
+ ret = read(pollfd, &stream, sizeof(stream));
} while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret < sizeof(struct lttng_consumer_stream)) {
+ if (ret < 0 ||
+ ret < sizeof(struct lttng_consumer_stream *)) {
PERROR("read metadata stream");
- free(stream);
/*
* Let's continue here and hope we can still work
* without stopping the consumer. XXX: Should we?
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
+ rcu_read_lock();
/* The node should be init at this point */
lttng_ht_add_unique_ulong(metadata_ht,
&stream->waitfd_node);
+ rcu_read_unlock();
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
/* From here, the event is a metadata wait fd */
+ rcu_read_lock();
lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node == NULL) {
/* FD not found, continue loop */
+ rcu_read_unlock();
continue;
}
len = ctx->on_buffer_ready(stream, ctx);
/* It's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN) {
+ rcu_read_unlock();
goto end;
} else if (len > 0) {
stream->data_read = 1;
* Remove the stream from the hash table since there is no data
* left on the fd because we previously did a read on the buffer.
*/
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) {
+ if (revents & (LPOLLERR | LPOLLHUP)) {
DBG("Metadata fd %d is hup|err|nval.", pollfd);
if (!stream->hangup_flush_done
&& (consumer_data.type == LTTNG_CONSUMER32_UST
len = ctx->on_buffer_ready(stream, ctx);
/* It's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN) {
+ rcu_read_unlock();
goto end;
}
}
/* Removing it from hash table, poll set and free memory */
lttng_ht_del(metadata_ht, &iter);
+
lttng_poll_del(&events, stream->wait_fd);
consumer_del_metadata_stream(stream);
}
+ rcu_read_unlock();
}
}
* Add relayd socket pair to consumer data hashtable. If object already
* exists or on error, the function gracefully returns.
*/
- consumer_add_relayd(relayd);
+ add_relayd(relayd);
/* All good! */
ret = 0;