{
struct lttng_consumer_stream *iter;
+ /* Negative keys are lookup failures */
+ if (key < 0)
+ return NULL;
cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
if (iter->key == key) {
DBG("Found stream key %d", key);
return NULL;
}
+static void consumer_steal_stream_key(int key)
+{
+ struct lttng_consumer_stream *stream;
+
+ stream = consumer_find_stream(key);
+ if (stream)
+ stream->key = -1;
+}
+
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
struct lttng_consumer_channel *iter;
+ /* Negative keys are lookup failures */
+ if (key < 0)
+ return NULL;
cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
if (iter->key == key) {
DBG("Found channel key %d", key);
return NULL;
}
+static void consumer_steal_channel_key(int key)
+{
+ struct lttng_consumer_channel *channel;
+
+ channel = consumer_find_channel(key);
+ if (channel)
+ channel->key = -1;
+}
+
/*
* Remove a stream from the global list protected by a mutex. This
* function is also responsible for freeing its data structures.
struct lttng_consumer_stream *stream;
int ret;
- stream = malloc(sizeof(*stream));
+ stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
perror("malloc struct lttng_consumer_stream");
goto end;
int ret = 0;
pthread_mutex_lock(&consumer_data.lock);
- /* Check if already exist */
- if (consumer_find_stream(stream->key)) {
- ret = -1;
- goto end;
- }
+ /* Steal stream identifier, for UST */
+ consumer_steal_stream_key(stream->key);
cds_list_add(&stream->list, &consumer_data.stream_list.head);
consumer_data.stream_count++;
consumer_data.need_update = 1;
*/
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
- int ret = 0;
-
pthread_mutex_lock(&consumer_data.lock);
- /* Check if already exist */
- if (consumer_find_channel(channel->key)) {
- ret = -1;
- goto end;
- }
+ /* Steal channel identifier, for UST */
+ consumer_steal_channel_key(channel->key);
cds_list_add(&channel->list, &consumer_data.channel_list.head);
-end:
pthread_mutex_unlock(&consumer_data.lock);
- return ret;
+ return 0;
}
/*
consumer_data.type == type);
consumer_data.type = type;
- ctx = malloc(sizeof(struct lttng_consumer_local_data));
+ ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
if (ctx == NULL) {
perror("allocating context");
goto error;
int tmp2;
struct lttng_consumer_local_data *ctx = data;
- local_stream = malloc(sizeof(struct lttng_consumer_stream));
+ local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
while (1) {
high_prio = 0;
}
/* allocate for all fds + 1 for the consumer_poll_pipe */
- pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+ pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
perror("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
}
/* allocate for all fds + 1 for the consumer_poll_pipe */
- local_stream = malloc((consumer_data.stream_count + 1) *
+ local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
perror("local_stream malloc");
pollfd[i].fd);
if (!local_stream[i]->hangup_flush_done) {
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
- /* try reading after flush */
- ret = ctx->on_buffer_ready(local_stream[i], ctx);
- /* it's ok to have an unavailable sub-buffer */
- if (ret == EAGAIN) {
- ret = 0;
- }
+ /* read after flush */
+ do {
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
+ } while (ret == EAGAIN);
}
} else {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);