{
int ret;
struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream, *stmp;
DBG("Consumer delete channel key %" PRIu64, channel->key);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ lttng_ustconsumer_del_stream(stream);
+ free(stream);
+ }
lttng_ustconsumer_del_channel(channel);
break;
default:
/*
* Add a channel to the global list protected by a mutex.
+ *
+ * On success 0 is returned else a negative value.
*/
int consumer_add_channel(struct lttng_consumer_channel *channel,
struct lttng_consumer_local_data *ctx)
/* Channel already exist. Ignore the insertion */
ERR("Consumer add channel key %" PRIu64 " already exists!",
channel->key);
- ret = LTTNG_ERR_KERN_CHAN_EXIST;
+ ret = -EEXIST;
goto end;
}
/* allocate for all fds + 1 for the consumer_data_pipe */
local_stream = zmalloc((consumer_data.stream_count + 1) *
- sizeof(struct lttng_consumer_stream));
+ sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
pthread_mutex_unlock(&consumer_data.lock);
break;
case CONSUMER_CHANNEL_DEL:
{
+ struct lttng_consumer_stream *stream, *stmp;
+
rcu_read_lock();
chan = consumer_find_channel(key);
if (!chan) {
break;
}
lttng_poll_del(&events, chan->wait_fd);
+ iter.iter.node = &chan->wait_fd_node.node;
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
consumer_close_channel_streams(chan);
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ lttng_ustconsumer_del_stream(stream);
+ uatomic_sub(&stream->chan->refcount, 1);
+ assert(&chan->refcount);
+ free(stream);
+ }
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+
/*
* Release our own refcount. Force channel deletion even if
* streams were not initialized.
lttng_poll_del(&events, chan->wait_fd);
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
+ assert(cds_list_empty(&chan->streams.head));
consumer_close_channel_streams(chan);
/* Release our own refcount */
goto end;
}
- ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto end;
- }
-
/* prepare the FDs to poll : to client socket and the should_quit pipe */
consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
consumer_sockpoll[0].events = POLLIN | POLLPRI;
WARN("On accept");
goto end;
}
- ret = fcntl(sock, F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto end;
- }
/*
* Setup metadata socket which is the second socket connection on the