* Thread polls on metadata file descriptor and write them on disk or on the
* network.
*/
-void *lttng_consumer_thread_poll_metadata(void *data)
+void *consumer_thread_metadata_poll(void *data)
{
int ret, i, pollfd;
uint32_t revents, nb_fd;
lttng_ustconsumer_on_stream_hangup(stream);
/* We just flushed the stream now read it. */
- len = ctx->on_buffer_ready(stream, ctx);
- /* It's ok to have an unavailable sub-buffer */
- if (len < 0 && len != -EAGAIN) {
- rcu_read_unlock();
- goto end;
- }
+ do {
+ len = ctx->on_buffer_ready(stream, ctx);
+ /*
+ * We don't check the return value here since if we get
+ * a negative len, it means an error occured thus we
+ * simply remove it from the poll set and free the
+ * stream.
+ */
+ } while (len > 0);
}
lttng_poll_del(&events, stream->wait_fd);
len = ctx->on_buffer_ready(stream, ctx);
/* It's ok to have an unavailable sub-buffer */
- if (len < 0 && len != -EAGAIN) {
+ if (len < 0 && len != -EAGAIN && len != -ENODATA) {
rcu_read_unlock();
goto end;
} else if (len > 0) {
* This thread polls the fds in the set to consume the data and write
* it to tracefile if necessary.
*/
-void *lttng_consumer_thread_poll_fds(void *data)
+void *consumer_thread_data_poll(void *data)
{
int num_rdy, num_hup, high_prio, ret, i;
struct pollfd *pollfd = NULL;
int nb_fd = 0;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
- pthread_t metadata_thread;
- void *status;
rcu_register_thread();
- /* Start metadata polling thread */
- ret = pthread_create(&metadata_thread, NULL,
- lttng_consumer_thread_poll_metadata, (void *) ctx);
- if (ret < 0) {
- PERROR("pthread_create metadata thread");
- goto end;
- }
-
local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
while (1) {
high_prio = 1;
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
- if (len < 0 && len != -EAGAIN) {
+ if (len < 0 && len != -EAGAIN && len != -ENODATA) {
goto end;
} else if (len > 0) {
local_stream[i]->data_read = 1;
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
- if (len < 0 && len != -EAGAIN) {
+ if (len < 0 && len != -EAGAIN && len != -ENODATA) {
goto end;
} else if (len > 0) {
local_stream[i]->data_read = 1;
/*
* Close the write side of the pipe so epoll_wait() in
- * lttng_consumer_thread_poll_metadata can catch it. The thread is
- * monitoring the read side of the pipe. If we close them both, epoll_wait
- * strangely does not return and could create a endless wait period if the
- * pipe is the only tracked fd in the poll set. The thread will take care
- * of closing the read side.
+ * consumer_thread_metadata_poll can catch it. The thread is monitoring the
+ * read side of the pipe. If we close them both, epoll_wait strangely does
+ * not return and could create a endless wait period if the pipe is the
+ * only tracked fd in the poll set. The thread will take care of closing
+ * the read side.
*/
close(ctx->consumer_metadata_pipe[1]);
- if (ret) {
- ret = pthread_join(metadata_thread, &status);
- if (ret < 0) {
- PERROR("pthread_join metadata thread");
- }
- }
rcu_unregister_thread();
return NULL;
* This thread listens on the consumerd socket and receives the file
* descriptors from the session daemon.
*/
-void *lttng_consumer_thread_receive_fds(void *data)
+void *consumer_thread_sessiond_poll(void *data)
{
int sock, client_socket, ret;
/*