{
struct lttng_consumer_stream *iter;
+ /* Negative keys are lookup failures */
+ if (key < 0)
+ return NULL;
cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
if (iter->key == key) {
DBG("Found stream key %d", key);
return NULL;
}
+static void consumer_steal_stream_key(int key)
+{
+ struct lttng_consumer_stream *stream;
+
+ stream = consumer_find_stream(key);
+ if (stream)
+ stream->key = -1;
+}
+
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
struct lttng_consumer_channel *iter;
+ /* Negative keys are lookup failures */
+ if (key < 0)
+ return NULL;
cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
if (iter->key == key) {
DBG("Found channel key %d", key);
return NULL;
}
+static void consumer_steal_channel_key(int key)
+{
+ struct lttng_consumer_channel *channel;
+
+ channel = consumer_find_channel(key);
+ if (channel)
+ channel->key = -1;
+}
+
/*
* Remove a stream from the global list protected by a mutex. This
* function is also responsible for freeing its data structures.
}
}
break;
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
lttng_ustconsumer_del_stream(stream);
break;
default:
if (stream->out_fd >= 0) {
close(stream->out_fd);
}
- if (stream->wait_fd >= 0) {
+ if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
close(stream->wait_fd);
}
- if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
+ if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd
+ && !stream->shm_fd_is_copy) {
close(stream->shm_fd);
}
if (!--stream->chan->refcount)
struct lttng_consumer_stream *stream;
int ret;
- stream = malloc(sizeof(*stream));
+ stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
perror("malloc struct lttng_consumer_stream");
goto end;
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ stream->cpu = stream->chan->cpucount++;
ret = lttng_ustconsumer_allocate_stream(stream);
if (ret) {
free(stream);
int ret = 0;
pthread_mutex_lock(&consumer_data.lock);
- /* Check if already exist */
- if (consumer_find_stream(stream->key)) {
- ret = -1;
- goto end;
- }
+ /* Steal stream identifier, for UST */
+ consumer_steal_stream_key(stream->key);
cds_list_add(&stream->list, &consumer_data.stream_list.head);
consumer_data.stream_count++;
consumer_data.need_update = 1;
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
/* Streams are in CPU number order (we rely on this) */
stream->cpu = stream->chan->nr_streams++;
break;
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
lttng_ustconsumer_del_channel(channel);
break;
default:
perror("munmap");
}
}
- if (channel->wait_fd >= 0) {
+ if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
close(channel->wait_fd);
}
- if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
+ if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd
+ && !channel->shm_fd_is_copy) {
close(channel->shm_fd);
}
free(channel);
struct lttng_consumer_channel *channel;
int ret;
- channel = malloc(sizeof(*channel));
+ channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
perror("malloc struct lttng_consumer_channel");
goto end;
channel->mmap_base = NULL;
channel->mmap_len = 0;
break;
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
ret = lttng_ustconsumer_allocate_channel(channel);
if (ret) {
free(channel);
*/
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
- int ret = 0;
-
pthread_mutex_lock(&consumer_data.lock);
- /* Check if already exist */
- if (consumer_find_channel(channel->key)) {
- ret = -1;
- goto end;
- }
+ /* Steal channel identifier, for UST */
+ consumer_steal_channel_key(channel->key);
cds_list_add(&channel->list, &consumer_data.channel_list.head);
-end:
pthread_mutex_unlock(&consumer_data.lock);
- return ret;
+ return 0;
}
/*
*/
struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,
- int (*buffer_ready)(struct lttng_consumer_stream *stream),
+ int (*buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
int (*update_stream)(int stream_key, uint32_t state))
consumer_data.type == type);
consumer_data.type = type;
- ctx = malloc(sizeof(struct lttng_consumer_local_data));
+ ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
if (ctx == NULL) {
perror("allocating context");
goto error;
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
default:
ERR("Unknown consumer_data type");
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return -ENOSYS;
default:
ERR("Unknown consumer_data type");
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_take_snapshot(ctx, stream);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return lttng_ustconsumer_take_snapshot(ctx, stream);
default:
ERR("Unknown consumer_data type");
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
default:
ERR("Unknown consumer_data type");
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
- case LTTNG_CONSUMER_UST:
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
default:
ERR("Unknown consumer_data type");
int tmp2;
struct lttng_consumer_local_data *ctx = data;
- local_stream = malloc(sizeof(struct lttng_consumer_stream));
+ local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
while (1) {
high_prio = 0;
}
/* allocate for all fds + 1 for the consumer_poll_pipe */
- pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+ pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
perror("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
}
/* allocate for all fds + 1 for the consumer_poll_pipe */
- local_stream = malloc((consumer_data.stream_count + 1) *
+ local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
perror("local_stream malloc");
goto end;
}
- /* No FDs and consumer_quit, kconsumer_cleanup the thread */
+ /* No FDs and consumer_quit, consumer_cleanup the thread */
if (nb_fd == 0 && consumer_quit == 1) {
goto end;
}
* array. We want to prioritize array update over
* low-priority reads.
*/
- if (pollfd[nb_fd].revents == POLLIN) {
+ if (pollfd[nb_fd].revents & POLLIN) {
DBG("consumer_poll_pipe wake up");
tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
if (tmp2 < 0) {
- perror("read kconsumer poll");
+ perror("read consumer poll");
}
continue;
}
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
- switch(pollfd[i].revents) {
- case POLLERR:
+ if (pollfd[i].revents & POLLPRI) {
+ DBG("Urgent read on fd %d", pollfd[i].fd);
+ high_prio = 1;
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
+ /* it's ok to have an unavailable sub-buffer */
+ if (ret == EAGAIN) {
+ ret = 0;
+ }
+ } else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
consumer_del_stream(local_stream[i]);
num_hup++;
- break;
- case POLLHUP:
- DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
- consumer_del_stream(local_stream[i]);
- num_hup++;
- break;
- case POLLNVAL:
+ } else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
consumer_del_stream(local_stream[i]);
num_hup++;
- break;
- case POLLPRI:
- DBG("Urgent read on fd %d", pollfd[i].fd);
- high_prio = 1;
- ret = ctx->on_buffer_ready(local_stream[i]);
- /* it's ok to have an unavailable sub-buffer */
- if (ret == EAGAIN) {
- ret = 0;
+ } else if ((pollfd[i].revents & POLLHUP) &&
+ !(pollfd[i].revents & POLLIN)) {
+ if (consumer_data.type == LTTNG_CONSUMER32_UST
+ || consumer_data.type == LTTNG_CONSUMER64_UST) {
+ DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
+ pollfd[i].fd);
+ if (!local_stream[i]->hangup_flush_done) {
+ lttng_ustconsumer_on_stream_hangup(local_stream[i]);
+ /* read after flush */
+ do {
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
+ } while (ret == EAGAIN);
+ }
+ } else {
+ DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
}
- break;
+ consumer_del_stream(local_stream[i]);
+ num_hup++;
}
}
/* Take care of low priority channels. */
if (high_prio == 0) {
for (i = 0; i < nb_fd; i++) {
- if (pollfd[i].revents == POLLIN) {
+ if (pollfd[i].revents & POLLIN) {
DBG("Normal read on fd %d", pollfd[i].fd);
- ret = ctx->on_buffer_ready(local_stream[i]);
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable subbuffer */
if (ret == EAGAIN) {
ret = 0;
goto end;
}
- DBG("Sending ready command to ltt-sessiond");
+ DBG("Sending ready command to lttng-sessiond");
ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
/* return < 0 on error, but == 0 is not fatal */
if (ret < 0) {
- ERR("Error sending ready command to ltt-sessiond");
+ ERR("Error sending ready command to lttng-sessiond");
goto end;
}
}
return NULL;
}
+
+int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_read_subbuffer(stream, ctx);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_read_subbuffer(stream, ctx);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
+int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_on_recv_stream(stream);
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_on_recv_stream(stream);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}