Fix: Missing semicolon after debug statement
[lttng-tools.git] / src / common / consumer.c
index 8bac35e7db8aa492f6f3a3056f5dfa623a8588f6..1c6838be27a346bf80d3fe4dff05563c39c73a80 100644 (file)
@@ -47,6 +47,7 @@
 #include "consumer.h"
 #include "consumer-stream.h"
 #include "consumer-testpoint.h"
+#include "align.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -284,6 +285,17 @@ static void free_channel_rcu(struct rcu_head *head)
        struct lttng_consumer_channel *channel =
                caa_container_of(node, struct lttng_consumer_channel, node);
 
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_free_channel(channel);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
        free(channel);
 }
 
@@ -561,6 +573,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        stream->index_fd = -1;
        pthread_mutex_init(&stream->lock, NULL);
+       pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
        if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
@@ -997,7 +1010,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
 
        CDS_INIT_LIST_HEAD(&channel->streams.head);
 
-       DBG("Allocated channel (key %" PRIu64 ")", channel->key)
+       DBG("Allocated channel (key %" PRIu64 ")", channel->key);
 
 end:
        return channel;
@@ -1208,6 +1221,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
                off_t orig_offset)
 {
+       int ret;
        int outfd = stream->out_fd;
 
        /*
@@ -1238,8 +1252,12 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
         * defined. So it can be expected to lead to lower throughput in
         * streaming.
         */
-       posix_fadvise(outfd, orig_offset - stream->max_sb_size,
+       ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size,
                        stream->max_sb_size, POSIX_FADV_DONTNEED);
+       if (ret && ret != -ENOSYS) {
+               errno = ret;
+               PERROR("posix_fadvise on fd %i", outfd);
+       }
 }
 
 /*
@@ -1573,6 +1591,12 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        outfd = stream->out_fd;
 
                        if (stream->index_fd >= 0) {
+                               ret = close(stream->index_fd);
+                               if (ret < 0) {
+                                       PERROR("Closing index");
+                                       goto end;
+                               }
+                               stream->index_fd = -1;
                                ret = index_create_file(stream->chan->pathname,
                                                stream->name, stream->uid, stream->gid,
                                                stream->chan->tracefile_size,
@@ -1633,8 +1657,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                lttng_sync_file_range(outfd, stream->out_fd_offset, len,
                                SYNC_FILE_RANGE_WRITE);
                stream->out_fd_offset += len;
+               lttng_consumer_sync_trace_file(stream, orig_offset);
        }
-       lttng_consumer_sync_trace_file(stream, orig_offset);
 
 write_error:
        /*
@@ -1756,6 +1780,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        outfd = stream->out_fd;
 
                        if (stream->index_fd >= 0) {
+                               ret = close(stream->index_fd);
+                               if (ret < 0) {
+                                       PERROR("Closing index");
+                                       goto end;
+                               }
+                               stream->index_fd = -1;
                                ret = index_create_file(stream->chan->pathname,
                                                stream->name, stream->uid, stream->gid,
                                                stream->chan->tracefile_size,
@@ -1839,7 +1869,9 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                stream->output_written += ret_splice;
                written += ret_splice;
        }
-       lttng_consumer_sync_trace_file(stream, orig_offset);
+       if (!relayd) {
+               lttng_consumer_sync_trace_file(stream, orig_offset);
+       }
        goto end;
 
 write_error:
@@ -2180,26 +2212,24 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Metadata main loop started");
 
        while (1) {
-               health_code_update();
-
-               /* Only the metadata pipe is set */
-               if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
-                       err = 0;        /* All is OK */
-                       goto end;
-               }
-
 restart:
-               DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_code_update();
                health_poll_entry();
+               DBG("Metadata poll wait");
                ret = lttng_poll_wait(&events, -1);
+               DBG("Metadata poll return from wait with %d fd(s)",
+                               LTTNG_POLL_GETNB(&events));
                health_poll_exit();
-               DBG("Metadata event catched in thread");
+               DBG("Metadata event caught in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
-                               ERR("Poll EINTR catched");
+                               ERR("Poll EINTR caught");
                                goto restart;
                        }
-                       goto error;
+                       if (LTTNG_POLL_GETNB(&events) == 0) {
+                               err = 0;        /* All is OK */
+                       }
+                       goto end;
                }
 
                nb_fd = ret;
@@ -2211,27 +2241,28 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
-                               if (revents & (LPOLLERR | LPOLLHUP )) {
-                                       DBG("Metadata thread pipe hung up");
-                                       /*
-                                        * Remove the pipe from the poll set and continue the loop
-                                        * since their might be data to consume.
-                                        */
-                                       lttng_poll_del(&events,
-                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
-                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
-                                       continue;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        ssize_t pipe_len;
 
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                        &stream, sizeof(stream));
                                        if (pipe_len < sizeof(stream)) {
-                                               PERROR("read metadata stream");
+                                               if (pipe_len < 0) {
+                                                       PERROR("read metadata stream");
+                                               }
                                                /*
-                                                * Continue here to handle the rest of the streams.
+                                                * Remove the pipe from the poll set and continue the loop
+                                                * since their might be data to consume.
                                                 */
+                                               lttng_poll_del(&events,
+                                                               lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+                                               lttng_pipe_read_close(ctx->consumer_metadata_pipe);
                                                continue;
                                        }
 
@@ -2248,6 +2279,19 @@ restart:
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
                                                        LPOLLIN | LPOLLPRI | LPOLLHUP);
+                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       DBG("Metadata thread pipe hung up");
+                                       /*
+                                        * Remove the pipe from the poll set and continue the loop
+                                        * since their might be data to consume.
+                                        */
+                                       lttng_poll_del(&events,
+                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
+                                       continue;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto end;
                                }
 
                                /* Handle other stream */
@@ -2266,8 +2310,30 @@ restart:
                        stream = caa_container_of(node, struct lttng_consumer_stream,
                                        node);
 
-                       /* Check for error event */
-                       if (revents & (LPOLLERR | LPOLLHUP)) {
+                       if (revents & (LPOLLIN | LPOLLPRI)) {
+                               /* Get the data out of the metadata file descriptor */
+                               DBG("Metadata available on fd %d", pollfd);
+                               assert(stream->wait_fd == pollfd);
+
+                               do {
+                                       health_code_update();
+
+                                       len = ctx->on_buffer_ready(stream, ctx);
+                                       /*
+                                        * We don't check the return value here since if we get
+                                        * a negative len, it means an error occured thus we
+                                        * simply remove it from the poll set and free the
+                                        * stream.
+                                        */
+                               } while (len > 0);
+
+                               /* It's ok to have an unavailable sub-buffer */
+                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
+                                       /* Clean up stream from consumer and free it. */
+                                       lttng_poll_del(&events, stream->wait_fd);
+                                       consumer_del_metadata_stream(stream, metadata_ht);
+                               }
+                       } else if (revents & (LPOLLERR | LPOLLHUP)) {
                                DBG("Metadata fd %d is hup|err.", pollfd);
                                if (!stream->hangup_flush_done
                                                && (consumer_data.type == LTTNG_CONSUMER32_UST
@@ -2295,31 +2361,11 @@ restart:
                                 * and securely free the stream.
                                 */
                                consumer_del_metadata_stream(stream, metadata_ht);
-                       } else if (revents & (LPOLLIN | LPOLLPRI)) {
-                               /* Get the data out of the metadata file descriptor */
-                               DBG("Metadata available on fd %d", pollfd);
-                               assert(stream->wait_fd == pollfd);
-
-                               do {
-                                       health_code_update();
-
-                                       len = ctx->on_buffer_ready(stream, ctx);
-                                       /*
-                                        * We don't check the return value here since if we get
-                                        * a negative len, it means an error occured thus we
-                                        * simply remove it from the poll set and free the
-                                        * stream.
-                                        */
-                               } while (len > 0);
-
-                               /* It's ok to have an unavailable sub-buffer */
-                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       /* Clean up stream from consumer and free it. */
-                                       lttng_poll_del(&events, stream->wait_fd);
-                                       consumer_del_metadata_stream(stream, metadata_ht);
-                               }
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               rcu_read_unlock();
+                               goto end;
                        }
-
                        /* Release RCU lock for the stream looked up */
                        rcu_read_unlock();
                }
@@ -2327,7 +2373,6 @@ restart:
 
        /* All is OK */
        err = 0;
-error:
 end:
        DBG("Metadata poll thread exiting");
 
@@ -2750,25 +2795,23 @@ void *consumer_thread_channel_poll(void *data)
        DBG("Channel main loop started");
 
        while (1) {
-               health_code_update();
-
-               /* Only the channel pipe is set */
-               if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
-                       err = 0;        /* All is OK */
-                       goto end;
-               }
-
 restart:
-               DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_code_update();
+               DBG("Channel poll wait");
                health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               DBG("Channel poll return from wait with %d fd(s)",
+                               LTTNG_POLL_GETNB(&events));
                health_poll_exit();
-               DBG("Channel event catched in thread");
+               DBG("Channel event caught in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
-                               ERR("Poll EINTR catched");
+                               ERR("Poll EINTR caught");
                                goto restart;
                        }
+                       if (LTTNG_POLL_GETNB(&events) == 0) {
+                               err = 0;        /* All is OK */
+                       }
                        goto end;
                }
 
@@ -2781,26 +2824,22 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Just don't waste time if no returned events for the fd */
                        if (!revents) {
+                               /* No activity for this FD (poll implementation). */
                                continue;
                        }
+
                        if (pollfd == ctx->consumer_channel_pipe[0]) {
-                               if (revents & (LPOLLERR | LPOLLHUP)) {
-                                       DBG("Channel thread pipe hung up");
-                                       /*
-                                        * Remove the pipe from the poll set and continue the loop
-                                        * since their might be data to consume.
-                                        */
-                                       lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
-                                       continue;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        enum consumer_channel_action action;
                                        uint64_t key;
 
                                        ret = read_channel_pipe(ctx, &chan, &key, &action);
                                        if (ret <= 0) {
-                                               ERR("Error reading channel pipe");
+                                               if (ret < 0) {
+                                                       ERR("Error reading channel pipe");
+                                               }
+                                               lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
                                                continue;
                                        }
 
@@ -2817,7 +2856,7 @@ restart:
                                                rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                lttng_poll_add(&events, chan->wait_fd,
-                                                               LPOLLIN | LPOLLPRI);
+                                                               LPOLLERR | LPOLLHUP);
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
@@ -2877,6 +2916,17 @@ restart:
                                                ERR("Unknown action");
                                                break;
                                        }
+                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       DBG("Channel thread pipe hung up");
+                                       /*
+                                        * Remove the pipe from the poll set and continue the loop
+                                        * since their might be data to consume.
+                                        */
+                                       lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+                                       continue;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto end;
                                }
 
                                /* Handle other stream */
@@ -2915,6 +2965,10 @@ restart:
                                                && !uatomic_read(&chan->nb_init_stream_left)) {
                                        consumer_del_channel(chan);
                                }
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               rcu_read_unlock();
+                               goto end;
                        }
 
                        /* Release RCU lock for the channel looked up */
@@ -3649,22 +3703,19 @@ int consumer_send_status_channel(int sock,
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
 }
 
-/*
- * Using a maximum stream size with the produced and consumed position of a
- * stream, computes the new consumed position to be as close as possible to the
- * maximum possible stream size.
- *
- * If maximum stream size is lower than the possible buffer size (produced -
- * consumed), the consumed_pos given is returned untouched else the new value
- * is returned.
- */
-unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
-               unsigned long produced_pos, uint64_t max_stream_size)
+unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t nb_packets_per_stream,
+               uint64_t max_sb_size)
 {
-       if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
-               /* Offset from the produced position to get the latest buffers. */
-               return produced_pos - max_stream_size;
-       }
+       unsigned long start_pos;
 
-       return consumed_pos;
+       if (!nb_packets_per_stream) {
+               return consumed_pos;    /* Grab everything */
+       }
+       start_pos = produced_pos - offset_align_floor(produced_pos, max_sb_size);
+       start_pos -= max_sb_size * nb_packets_per_stream;
+       if ((long) (start_pos - consumed_pos) < 0) {
+               return consumed_pos;    /* Grab everything */
+       }
+       return start_pos;
 }
This page took 0.029726 seconds and 4 git commands to generate.