Fix: grab more than one packet for snapshots
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 15 Jan 2015 22:24:27 +0000 (17:24 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 22 Jan 2015 02:50:22 +0000 (21:50 -0500)
There are a few issues with snapshot size: when taking a snapshot
without specifying any "max size" (should be unlimited), only a single
packet from each stream is saved. We expect all stream available content
to be saved. There is a similar issue when a max size is specified.

Also, trying to make all streams save as much data has unexpected
corner-cases: for instance, if we have this configuration:
- kernel channels: 2 subbuffers of 1MB x 8 CPUs
- per-PID UST channels: 16 subbuffers of 4kB x 8 CPUs x 100 apps

would require the user to have a very large max size, since it would try
to fit (8 + (100 * 8)) * 1MB = 808MB of sub-buffers, else it would fail.
This issue here is using the largest subbuffer size as the criterion
applied to all channels.

We fix those issues by simplifying the algorithm used to calculate how
much data to grab. Rather than calculating the size to grab from each
stream, we calculate a number of packets to grab. It fails if we cannot
grab at least one packet from each stream in the session. Then checks if
it can grab 2 packets from each stream, and so on, until there is no
more space available (based on max size). This is not a perfect
solution, but has the merit of being simple to understand, and has no
(or few) unexpected corner-cases.

Fixes #860

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Conflicts:
src/bin/lttng-sessiond/kernel.h

21 files changed:
doc/man/lttng.1
src/bin/lttng-sessiond/buffer-registry.h
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/kernel.c
src/bin/lttng-sessiond/kernel.h
src/bin/lttng-sessiond/snapshot.c
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-app.h
src/bin/lttng/commands/snapshot.c
src/common/align.h [new file with mode: 0644]
src/common/bug.h [new file with mode: 0644]
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c
src/lib/lttng-ctl/filter/align.h [deleted file]
src/lib/lttng-ctl/filter/bug.h [deleted file]
src/lib/lttng-ctl/filter/filter-visitor-generate-bytecode.c

index 252d3b5da5455d29c146382fa498278181106b67..d5b54171aa7cbd9febac75632ddb81aa2ec01990 100644 (file)
@@ -888,10 +888,6 @@ Name of the snapshot's output.
 Maximum size in bytes of the snapshot. The maximum size does not include the
 metadata file. Human readable format is accepted: {+k,+M,+G}. For instance,
 \-\-max-size 5M
-
-The minimum size of a snapshot is computed by multiplying the total amount of
-streams in the session by the largest subbuffer size. This is to ensure
-fairness between channels when extracting data.
 .TP
 .BR "\-C, \-\-ctrl-url URL"
 Set control path URL. (Must use -D also)
index c2099b6883c7ff70174431635b33080eb3e4d4b2..656fc9175b7d1bd6415ed3a2fd63ad9cc8def4f4 100644 (file)
@@ -51,6 +51,8 @@ struct buffer_reg_channel {
        struct lttng_ht_node_u64 node;
        /* Size of subbuffers in this channel. */
        size_t subbuf_size;
+       /* Number of subbuffers per stream. */
+       size_t num_subbuf;
        union {
                /* Original object data that MUST be copied over. */
                struct lttng_ust_object_data *ust;
index 6f1f2514127efbc0d693b13f3f6375f42ff8a251..c2db9ef65ce6fba4d5694d16bb4fcfc9085be65d 100644 (file)
@@ -2857,7 +2857,7 @@ error:
  */
 static int record_kernel_snapshot(struct ltt_kernel_session *ksess,
                struct snapshot_output *output, struct ltt_session *session,
-               int wait, uint64_t max_stream_size)
+               int wait, uint64_t nb_packets_per_stream)
 {
        int ret;
 
@@ -2888,7 +2888,7 @@ static int record_kernel_snapshot(struct ltt_kernel_session *ksess,
                goto error_snapshot;
        }
 
-       ret = kernel_snapshot_record(ksess, output, wait, max_stream_size);
+       ret = kernel_snapshot_record(ksess, output, wait, nb_packets_per_stream);
        if (ret != LTTNG_OK) {
                goto error_snapshot;
        }
@@ -2911,7 +2911,7 @@ end:
  */
 static int record_ust_snapshot(struct ltt_ust_session *usess,
                struct snapshot_output *output, struct ltt_session *session,
-               int wait, uint64_t max_stream_size)
+               int wait, uint64_t nb_packets_per_stream)
 {
        int ret;
 
@@ -2942,7 +2942,7 @@ static int record_ust_snapshot(struct ltt_ust_session *usess,
                goto error_snapshot;
        }
 
-       ret = ust_app_snapshot_record(usess, output, wait, max_stream_size);
+       ret = ust_app_snapshot_record(usess, output, wait, nb_packets_per_stream);
        if (ret < 0) {
                switch (-ret) {
                case EINVAL:
@@ -2967,69 +2967,90 @@ error:
        return ret;
 }
 
-/*
- * Return the biggest subbuffer size of all channels in the given session.
- */
-static uint64_t get_session_max_subbuf_size(struct ltt_session *session)
+static
+uint64_t get_session_size_one_more_packet_per_stream(struct ltt_session *session,
+       uint64_t cur_nr_packets)
 {
-       uint64_t max_size = 0;
-
-       assert(session);
+       uint64_t tot_size = 0;
 
        if (session->kernel_session) {
                struct ltt_kernel_channel *chan;
                struct ltt_kernel_session *ksess = session->kernel_session;
 
-               /*
-                * For each channel, add to the max size the size of each subbuffer
-                * multiplied by their sized.
-                */
                cds_list_for_each_entry(chan, &ksess->channel_list.head, list) {
-                       if (chan->channel->attr.subbuf_size > max_size) {
-                               max_size = chan->channel->attr.subbuf_size;
+                       if (cur_nr_packets >= chan->channel->attr.num_subbuf) {
+                               /*
+                                * Don't take channel into account if we
+                                * already grab all its packets.
+                                */
+                               continue;
                        }
+                       tot_size += chan->channel->attr.subbuf_size
+                               * chan->stream_count;
                }
        }
 
        if (session->ust_session) {
-               struct lttng_ht_iter iter;
-               struct ltt_ust_channel *uchan;
                struct ltt_ust_session *usess = session->ust_session;
 
-               rcu_read_lock();
-               cds_lfht_for_each_entry(usess->domain_global.channels->ht, &iter.iter,
-                               uchan, node.node) {
-                       if (uchan->attr.subbuf_size > max_size) {
-                               max_size = uchan->attr.subbuf_size;
-                       }
-               }
-               rcu_read_unlock();
+               tot_size += ust_app_get_size_one_more_packet_per_stream(usess,
+                               cur_nr_packets);
        }
 
-       return max_size;
+       return tot_size;
 }
 
 /*
- * Returns the total number of streams for a session or a negative value
- * on error.
+ * Calculate the number of packets we can grab from each stream that
+ * fits within the overall snapshot max size.
+ *
+ * Returns -1 on error, 0 means infinite number of packets, else > 0 is
+ * the number of packets per stream.
+ *
+ * TODO: this approach is not perfect: we consider the worse case
+ * (packet filling the sub-buffers) as an upper bound, but we could do
+ * better if we do this calculation while we actually grab the packet
+ * content: we would know how much padding we don't actually store into
+ * the file.
+ *
+ * This algorithm is currently bounded by the number of packets per
+ * stream.
+ *
+ * Since we call this algorithm before actually grabbing the data, it's
+ * an approximation: for instance, applications could appear/disappear
+ * in between this call and actually grabbing data.
  */
-static unsigned int get_session_nb_streams(struct ltt_session *session)
+static
+int64_t get_session_nb_packets_per_stream(struct ltt_session *session, uint64_t max_size)
 {
-       unsigned int total_streams = 0;
-
-       if (session->kernel_session) {
-               struct ltt_kernel_session *ksess = session->kernel_session;
+       int64_t size_left;
+       uint64_t cur_nb_packets = 0;
 
-               total_streams += ksess->stream_count_global;
+       if (!max_size) {
+               return 0;       /* Infinite */
        }
 
-       if (session->ust_session) {
-               struct ltt_ust_session *usess = session->ust_session;
+       size_left = max_size;
+       for (;;) {
+               uint64_t one_more_packet_tot_size;
 
-               total_streams += ust_app_get_nb_stream(usess);
+               one_more_packet_tot_size = get_session_size_one_more_packet_per_stream(session,
+                                       cur_nb_packets);
+               if (!one_more_packet_tot_size) {
+                       /* We are already grabbing all packets. */
+                       break;
+               }
+               size_left -= one_more_packet_tot_size;
+               if (size_left < 0) {
+                       break;
+               }
+               cur_nb_packets++;
        }
-
-       return total_streams;
+       if (!cur_nb_packets) {
+               /* Not enough room to grab one packet of each stream, error. */
+               return -1;
+       }
+       return cur_nb_packets;
 }
 
 /*
@@ -3047,7 +3068,6 @@ int cmd_snapshot_record(struct ltt_session *session,
        unsigned int use_tmp_output = 0;
        struct snapshot_output tmp_output;
        unsigned int nb_streams, snapshot_success = 0;
-       uint64_t session_max_size = 0, max_stream_size = 0;
 
        assert(session);
 
@@ -3086,44 +3106,20 @@ int cmd_snapshot_record(struct ltt_session *session,
                use_tmp_output = 1;
        }
 
-       /*
-        * Get the session maximum size for a snapshot meaning it will compute the
-        * size of all streams from all domain.
-        */
-       max_stream_size = get_session_max_subbuf_size(session);
-
-       nb_streams = get_session_nb_streams(session);
-       if (nb_streams) {
-               /*
-                * The maximum size of the snapshot is the number of streams multiplied
-                * by the biggest subbuf size of all channels in a session which is the
-                * maximum stream size available for each stream. The session max size
-                * is now checked against the snapshot max size value given by the user
-                * and if lower, an error is returned.
-                */
-               session_max_size = max_stream_size * nb_streams;
-       }
-
-       DBG3("Snapshot max size is %" PRIu64 " for max stream size of %" PRIu64,
-                       session_max_size, max_stream_size);
-
-       /*
-        * If we use a temporary output, check right away if the max size fits else
-        * for each output the max size will be checked.
-        */
-       if (use_tmp_output &&
-                       (tmp_output.max_size != 0 &&
-                       tmp_output.max_size < session_max_size)) {
-               ret = LTTNG_ERR_MAX_SIZE_INVALID;
-               goto error;
-       }
-
        if (session->kernel_session) {
                struct ltt_kernel_session *ksess = session->kernel_session;
 
                if (use_tmp_output) {
+                       int64_t nb_packets_per_stream;
+
+                       nb_packets_per_stream = get_session_nb_packets_per_stream(session,
+                                       tmp_output.max_size);
+                       if (nb_packets_per_stream < 0) {
+                               ret = LTTNG_ERR_MAX_SIZE_INVALID;
+                               goto error;
+                       }
                        ret = record_kernel_snapshot(ksess, &tmp_output, session,
-                                       wait, max_stream_size);
+                                       wait, nb_packets_per_stream);
                        if (ret != LTTNG_OK) {
                                goto error;
                        }
@@ -3135,6 +3131,8 @@ int cmd_snapshot_record(struct ltt_session *session,
                        rcu_read_lock();
                        cds_lfht_for_each_entry(session->snapshot.output_ht->ht,
                                        &iter.iter, sout, node.node) {
+                               int64_t nb_packets_per_stream;
+
                                /*
                                 * Make a local copy of the output and assign the possible
                                 * temporary value given by the caller.
@@ -3142,14 +3140,13 @@ int cmd_snapshot_record(struct ltt_session *session,
                                memset(&tmp_output, 0, sizeof(tmp_output));
                                memcpy(&tmp_output, sout, sizeof(tmp_output));
 
-                               /* Use temporary max size. */
                                if (output->max_size != (uint64_t) -1ULL) {
                                        tmp_output.max_size = output->max_size;
                                }
 
-                               if (tmp_output.max_size != 0 &&
-                                               tmp_output.max_size < session_max_size) {
-                                       rcu_read_unlock();
+                               nb_packets_per_stream = get_session_nb_packets_per_stream(session,
+                                               tmp_output.max_size);
+                               if (nb_packets_per_stream < 0) {
                                        ret = LTTNG_ERR_MAX_SIZE_INVALID;
                                        goto error;
                                }
@@ -3163,7 +3160,7 @@ int cmd_snapshot_record(struct ltt_session *session,
                                tmp_output.nb_snapshot = session->snapshot.nb_snapshot;
 
                                ret = record_kernel_snapshot(ksess, &tmp_output,
-                                               session, wait, max_stream_size);
+                                               session, wait, nb_packets_per_stream);
                                if (ret != LTTNG_OK) {
                                        rcu_read_unlock();
                                        goto error;
@@ -3178,8 +3175,16 @@ int cmd_snapshot_record(struct ltt_session *session,
                struct ltt_ust_session *usess = session->ust_session;
 
                if (use_tmp_output) {
+                       int64_t nb_packets_per_stream;
+
+                       nb_packets_per_stream = get_session_nb_packets_per_stream(session,
+                                       tmp_output.max_size);
+                       if (nb_packets_per_stream < 0) {
+                               ret = LTTNG_ERR_MAX_SIZE_INVALID;
+                               goto error;
+                       }
                        ret = record_ust_snapshot(usess, &tmp_output, session,
-                                       wait, max_stream_size);
+                                       wait, nb_packets_per_stream);
                        if (ret != LTTNG_OK) {
                                goto error;
                        }
@@ -3191,6 +3196,8 @@ int cmd_snapshot_record(struct ltt_session *session,
                        rcu_read_lock();
                        cds_lfht_for_each_entry(session->snapshot.output_ht->ht,
                                        &iter.iter, sout, node.node) {
+                               int64_t nb_packets_per_stream;
+
                                /*
                                 * Make a local copy of the output and assign the possible
                                 * temporary value given by the caller.
@@ -3198,15 +3205,15 @@ int cmd_snapshot_record(struct ltt_session *session,
                                memset(&tmp_output, 0, sizeof(tmp_output));
                                memcpy(&tmp_output, sout, sizeof(tmp_output));
 
-                               /* Use temporary max size. */
                                if (output->max_size != (uint64_t) -1ULL) {
                                        tmp_output.max_size = output->max_size;
                                }
 
-                               if (tmp_output.max_size != 0 &&
-                                               tmp_output.max_size < session_max_size) {
-                                       rcu_read_unlock();
+                               nb_packets_per_stream = get_session_nb_packets_per_stream(session,
+                                               tmp_output.max_size);
+                               if (nb_packets_per_stream < 0) {
                                        ret = LTTNG_ERR_MAX_SIZE_INVALID;
+                                       rcu_read_unlock();
                                        goto error;
                                }
 
@@ -3219,7 +3226,7 @@ int cmd_snapshot_record(struct ltt_session *session,
                                tmp_output.nb_snapshot = session->snapshot.nb_snapshot;
 
                                ret = record_ust_snapshot(usess, &tmp_output, session,
-                                               wait, max_stream_size);
+                                               wait, nb_packets_per_stream);
                                if (ret != LTTNG_OK) {
                                        rcu_read_unlock();
                                        goto error;
index ce3e5da9adda44d3481c4bb07785a30f76ac14f5..166d6c0440202d9798a6bbad8b656fb35bb9432b 100644 (file)
@@ -1276,7 +1276,7 @@ end:
  */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
-               const char *session_path, int wait, int max_stream_size)
+               const char *session_path, int wait, uint64_t nb_packets_per_stream)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -1290,7 +1290,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
        memset(&msg, 0, sizeof(msg));
        msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
        msg.u.snapshot_channel.key = key;
-       msg.u.snapshot_channel.max_stream_size = max_stream_size;
+       msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream;
        msg.u.snapshot_channel.metadata = metadata;
 
        if (output->consumer->type == CONSUMER_DST_NET) {
index 3601ed9147c170ae3a5bb8a7180f4e3f8e44279b..a9266d5b939798882377bc2caeb0e67251464fa2 100644 (file)
@@ -279,6 +279,6 @@ int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
 /* Snapshot command. */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
-               const char *session_path, int wait, int max_size_per_stream);
+               const char *session_path, int wait, uint64_t nb_packets_per_stream);
 
 #endif /* _CONSUMER_H */
index fc95302fccb9336332df0517b32dfec5a02954d5..de6023dcd49dbd5137424db95c19ca5833af9717 100644 (file)
@@ -826,7 +826,8 @@ void kernel_destroy_channel(struct ltt_kernel_channel *kchan)
  * Return 0 on success or else return a LTTNG_ERR code.
  */
 int kernel_snapshot_record(struct ltt_kernel_session *ksess,
-               struct snapshot_output *output, int wait, uint64_t max_size_per_stream)
+               struct snapshot_output *output, int wait,
+               uint64_t nb_packets_per_stream)
 {
        int err, ret, saved_metadata_fd;
        struct consumer_socket *socket;
@@ -887,7 +888,7 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
                        ret = consumer_snapshot_channel(socket, chan->fd, output, 0,
                                        ksess->uid, ksess->gid,
                                        DEFAULT_KERNEL_TRACE_DIR, wait,
-                                       max_size_per_stream);
+                                       nb_packets_per_stream);
                        pthread_mutex_unlock(socket->lock);
                        if (ret < 0) {
                                ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
@@ -901,7 +902,7 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
                pthread_mutex_lock(socket->lock);
                ret = consumer_snapshot_channel(socket, ksess->metadata->fd, output,
                                1, ksess->uid, ksess->gid,
-                               DEFAULT_KERNEL_TRACE_DIR, wait, max_size_per_stream);
+                               DEFAULT_KERNEL_TRACE_DIR, wait, 0);
                pthread_mutex_unlock(socket->lock);
                if (ret < 0) {
                        ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
index 681301fc2f2c73ed971be54e920dcd8ade5adb32..88e8a2631cffe734a6fcd701a260c13d6c2ffc7e 100644 (file)
@@ -55,8 +55,8 @@ int kernel_validate_version(int tracer_fd);
 void kernel_destroy_session(struct ltt_kernel_session *ksess);
 void kernel_destroy_channel(struct ltt_kernel_channel *kchan);
 int kernel_snapshot_record(struct ltt_kernel_session *ksess,
-               struct snapshot_output *output, int wait, uint64_t max_stream_size);
-
+               struct snapshot_output *output, int wait,
+               uint64_t nb_packets_per_stream);
 int init_kernel_workarounds(void);
 
 #endif /* _LTT_KERNEL_CTL_H */
index 8faf9a76a28f99f497c1b4542ed0acdfd5d5603f..e5660dac7b02f13bb59c49cabfdd93ba3952ccdf 100644 (file)
@@ -46,10 +46,11 @@ static int output_init(uint64_t max_size, const char *name,
 {
        int ret = 0, i;
 
-       assert(output);
-
        memset(output, 0, sizeof(struct snapshot_output));
 
+       /*
+        * max_size of -1ULL means unset. Set to default (unlimited).
+        */
        if (max_size == (uint64_t) -1ULL) {
                max_size = 0;
        }
index f0e61808ab6e9ee5e66e3bdf602333ed8e98a1fd..0ffcb3b6ab7e60a6fb44780413183803d923a462 100644 (file)
@@ -2398,6 +2398,7 @@ static int create_buffer_reg_channel(struct buffer_reg_session *reg_sess,
        assert(reg_chan);
        reg_chan->consumer_key = ua_chan->key;
        reg_chan->subbuf_size = ua_chan->attr.subbuf_size;
+       reg_chan->num_subbuf = ua_chan->attr.num_subbuf;
 
        /* Create and add a channel registry to session. */
        ret = ust_registry_channel_add(reg_sess->reg.ust,
@@ -5045,7 +5046,8 @@ void ust_app_destroy(struct ust_app *app)
  * Return 0 on success or else a negative value.
  */
 int ust_app_snapshot_record(struct ltt_ust_session *usess,
-               struct snapshot_output *output, int wait, uint64_t max_stream_size)
+               struct snapshot_output *output, int wait,
+               uint64_t nb_packets_per_stream)
 {
        int ret = 0;
        unsigned int snapshot_done = 0;
@@ -5089,14 +5091,14 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                                        reg_chan, node.node) {
                                ret = consumer_snapshot_channel(socket, reg_chan->consumer_key,
                                                output, 0, usess->uid, usess->gid, pathname, wait,
-                                               max_stream_size);
+                                               nb_packets_per_stream);
                                if (ret < 0) {
                                        goto error;
                                }
                        }
                        ret = consumer_snapshot_channel(socket,
                                        reg->registry->reg.ust->metadata_key, output, 1,
-                                       usess->uid, usess->gid, pathname, wait, max_stream_size);
+                                       usess->uid, usess->gid, pathname, wait, 0);
                        if (ret < 0) {
                                goto error;
                        }
@@ -5140,7 +5142,7 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                                        ua_chan, node.node) {
                                ret = consumer_snapshot_channel(socket, ua_chan->key, output,
                                                0, ua_sess->euid, ua_sess->egid, pathname, wait,
-                                               max_stream_size);
+                                               nb_packets_per_stream);
                                if (ret < 0) {
                                        goto error;
                                }
@@ -5149,8 +5151,7 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                        registry = get_session_registry(ua_sess);
                        assert(registry);
                        ret = consumer_snapshot_channel(socket, registry->metadata_key, output,
-                                       1, ua_sess->euid, ua_sess->egid, pathname, wait,
-                                       max_stream_size);
+                                       1, ua_sess->euid, ua_sess->egid, pathname, wait, 0);
                        if (ret < 0) {
                                goto error;
                        }
@@ -5178,11 +5179,12 @@ error:
 }
 
 /*
- * Return the number of streams for a UST session.
+ * Return the size taken by one more packet per stream.
  */
-unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
+uint64_t ust_app_get_size_one_more_packet_per_stream(struct ltt_ust_session *usess,
+               uint64_t cur_nr_packets)
 {
-       unsigned int ret = 0;
+       uint64_t tot_size = 0;
        struct ust_app *app;
        struct lttng_ht_iter iter;
 
@@ -5199,7 +5201,14 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
                        rcu_read_lock();
                        cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
                                        reg_chan, node.node) {
-                               ret += reg_chan->stream_count;
+                               if (cur_nr_packets >= reg_chan->num_subbuf) {
+                                       /*
+                                        * Don't take channel into account if we
+                                        * already grab all its packets.
+                                        */
+                                       continue;
+                               }
+                               tot_size += reg_chan->subbuf_size * reg_chan->stream_count;
                        }
                        rcu_read_unlock();
                }
@@ -5221,7 +5230,14 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
 
                        cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
                                        ua_chan, node.node) {
-                               ret += ua_chan->streams.count;
+                               if (cur_nr_packets >= ua_chan->attr.num_subbuf) {
+                                       /*
+                                        * Don't take channel into account if we
+                                        * already grab all its packets.
+                                        */
+                                       continue;
+                               }
+                               tot_size += ua_chan->attr.subbuf_size * ua_chan->streams.count;
                        }
                }
                rcu_read_unlock();
@@ -5232,5 +5248,5 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
                break;
        }
 
-       return ret;
+       return tot_size;
 }
index f83e857cfae0f36b17e8fe4159a6e538cbbf9af9..3e3eb3197c1b8f3aa13ee9175ded3cc3bf6da74f 100644 (file)
@@ -325,8 +325,10 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
                struct consumer_socket *socket, int send_zero_data);
 void ust_app_destroy(struct ust_app *app);
 int ust_app_snapshot_record(struct ltt_ust_session *usess,
-               struct snapshot_output *output, int wait, uint64_t max_stream_size);
-unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess);
+               struct snapshot_output *output, int wait,
+               uint64_t nb_packets_per_stream);
+uint64_t ust_app_get_size_one_more_packet_per_stream(struct ltt_ust_session *usess,
+               uint64_t cur_nr_packets);
 struct ust_app *ust_app_find_by_sock(int sock);
 
 static inline
index 2d250afc37cfb01df040159936dcbef24444f90d..936379eed061f123f099ad70f3ccaa59f72a2e9b 100644 (file)
@@ -373,9 +373,7 @@ static int record(const char *url)
        ret = lttng_snapshot_record(current_session_name, output, 0);
        if (ret < 0) {
                if (ret == -LTTNG_ERR_MAX_SIZE_INVALID) {
-                       ERR("The minimum size of a snapshot is computed by multiplying "
-                                       "the total amount of streams with the largest subbuffer "
-                                       "in the session.");
+                       ERR("Invalid snapshot size. Cannot fit at least one packet per stream.");
                }
                goto error;
        }
diff --git a/src/common/align.h b/src/common/align.h
new file mode 100644 (file)
index 0000000..928c5b6
--- /dev/null
@@ -0,0 +1,64 @@
+#ifndef _LTTNG_ALIGN_H
+#define _LTTNG_ALIGN_H
+
+/*
+ * align.h
+ *
+ * (C) Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ */
+
+#include "bug.h"
+#include <unistd.h>
+#include <limits.h>
+
+#ifndef PAGE_SIZE      /* Cygwin limits.h defines its own PAGE_SIZE */
+#define PAGE_SIZE              sysconf(_SC_PAGE_SIZE)
+#endif
+
+#define PAGE_MASK              (~(PAGE_SIZE - 1))
+#define __ALIGN_MASK(v, mask)  (((v) + (mask)) & ~(mask))
+#define ALIGN(v, align)                __ALIGN_MASK(v, (__typeof__(v)) (align) - 1)
+#define PAGE_ALIGN(addr)       ALIGN(addr, PAGE_SIZE)
+
+/**
+ * offset_align - Calculate the offset needed to align an object on its natural
+ *                alignment towards higher addresses.
+ * @align_drift:  object offset from an "alignment"-aligned address.
+ * @alignment:    natural object alignment. Must be non-zero, power of 2.
+ *
+ * Returns the offset that must be added to align towards higher
+ * addresses.
+ */
+#define offset_align(align_drift, alignment)                                  \
+       ({                                                                     \
+               LTTNG_BUILD_RUNTIME_BUG_ON((alignment) == 0                    \
+                                  || ((alignment) & ((alignment) - 1)));      \
+               (((alignment) - (align_drift)) & ((alignment) - 1));           \
+       })
+
+/**
+ * offset_align_floor - Calculate the offset needed to align an object
+ *                      on its natural alignment towards lower addresses.
+ * @align_drift:  object offset from an "alignment"-aligned address.
+ * @alignment:    natural object alignment. Must be non-zero, power of 2.
+ *
+ * Returns the offset that must be substracted to align towards lower addresses.
+ */
+#define offset_align_floor(align_drift, alignment)                            \
+       ({                                                                     \
+               LTTNG_BUILD_RUNTIME_BUG_ON((alignment) == 0                    \
+                                  || ((alignment) & ((alignment) - 1)));      \
+               (((align_drift) - (alignment)) & ((alignment) - 1));           \
+       })
+
+#endif /* _LTTNG_ALIGN_H */
diff --git a/src/common/bug.h b/src/common/bug.h
new file mode 100644 (file)
index 0000000..9368c08
--- /dev/null
@@ -0,0 +1,54 @@
+#ifndef _LTTNG_BUG_H
+#define _LTTNG_BUG_H
+
+/*
+ * lttng/bug.h
+ *
+ * (C) Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ */
+
+#include <urcu/compiler.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#define LTTNG_BUG_ON(condition)                                                \
+       do {                                                            \
+               if (caa_unlikely(condition)) {                          \
+                       fprintf(stderr,                                 \
+                               "LTTng BUG in file %s, line %d.\n",     \
+                               __FILE__, __LINE__);                    \
+                       exit(EXIT_FAILURE);                             \
+               }                                                       \
+       } while (0)
+
+#define LTTNG_BUILD_BUG_ON(condition)                                  \
+       ((void) sizeof(char[-!!(condition)]))
+
+/**
+ * LTTNG_BUILD_RUNTIME_BUG_ON - check condition at build (if constant) or runtime
+ * @condition: the condition which should be false.
+ *
+ * If the condition is a constant and true, the compiler will generate a build
+ * error. If the condition is not constant, a BUG will be triggered at runtime
+ * if the condition is ever true. If the condition is constant and false, no
+ * code is emitted.
+ */
+#define LTTNG_BUILD_RUNTIME_BUG_ON(condition)                  \
+       do {                                                    \
+               if (__builtin_constant_p(condition))            \
+                       LTTNG_BUILD_BUG_ON(condition);          \
+               else                                            \
+                       LTTNG_BUG_ON(condition);                \
+       } while (0)
+
+#endif
index 5041ed951b9ffa7d56d639f1e20a0afd3776e34f..e01e2fdfe44ca1170dcd8a1e5df9fce678006acc 100644 (file)
@@ -47,6 +47,7 @@
 #include "consumer.h"
 #include "consumer-stream.h"
 #include "consumer-testpoint.h"
+#include "align.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -3651,22 +3652,19 @@ int consumer_send_status_channel(int sock,
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
 }
 
-/*
- * Using a maximum stream size with the produced and consumed position of a
- * stream, computes the new consumed position to be as close as possible to the
- * maximum possible stream size.
- *
- * If maximum stream size is lower than the possible buffer size (produced -
- * consumed), the consumed_pos given is returned untouched else the new value
- * is returned.
- */
-unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
-               unsigned long produced_pos, uint64_t max_stream_size)
+unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t nb_packets_per_stream,
+               uint64_t max_sb_size)
 {
-       if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
-               /* Offset from the produced position to get the latest buffers. */
-               return produced_pos - max_stream_size;
-       }
+       unsigned long start_pos;
 
-       return consumed_pos;
+       if (!nb_packets_per_stream) {
+               return consumed_pos;    /* Grab everything */
+       }
+       start_pos = produced_pos - offset_align_floor(produced_pos, max_sb_size);
+       start_pos -= max_sb_size * nb_packets_per_stream;
+       if ((long) (start_pos - consumed_pos) < 0) {
+               return consumed_pos;    /* Grab everything */
+       }
+       return start_pos;
 }
index 1e378f04ea631ea4387a1940c407599d19a63966..790cb6b135d9b9983a9826e4713112ad586da247 100644 (file)
@@ -666,8 +666,9 @@ int consumer_send_status_channel(int sock,
 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
                uint64_t key);
 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
-unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
-               unsigned long produced_pos, uint64_t max_stream_size);
+unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t nb_packets_per_stream,
+               uint64_t max_sb_size);
 int consumer_add_data_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
 int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
index 027aa6975accde948c5fa81de5c82d36b0f55b38..93cebadfb476e7e6f894fed8022bf1e9240561f2 100644 (file)
@@ -114,7 +114,7 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
  * Returns 0 on success, < 0 on error
  */
 int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
-               uint64_t relayd_id, uint64_t max_stream_size,
+               uint64_t relayd_id, uint64_t nb_packets_per_stream,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
@@ -220,14 +220,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        }
                }
 
-               /*
-                * The original value is sent back if max stream size is larger than
-                * the possible size of the snapshot. Also, we asume that the session
-                * daemon should never send a maximum stream size that is lower than
-                * subbuffer size.
-                */
-               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
-                               produced_pos, max_stream_size);
+               consumed_pos = consumer_get_consume_start_pos(consumed_pos,
+                               produced_pos, nb_packets_per_stream,
+                               stream->max_sb_size);
 
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
@@ -885,7 +880,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
                                        msg.u.snapshot_channel.relayd_id,
-                                       msg.u.snapshot_channel.max_stream_size,
+                                       msg.u.snapshot_channel.nb_packets_per_stream,
                                        ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
index bae083dd83018e002afa36e2b648f586fca9bac1..6153da710e7f726cbf433e60c6d290cfbbc01478 100644 (file)
@@ -442,7 +442,7 @@ struct lttcomm_consumer_msg {
                        uint32_t metadata;              /* This a metadata snapshot. */
                        uint64_t relayd_id;             /* Relayd id if apply. */
                        uint64_t key;
-                       uint64_t max_stream_size;
+                       uint64_t nb_packets_per_stream;
                } LTTNG_PACKED snapshot_channel;
                struct {
                        uint64_t channel_key;
index bdace6586881c3eb76a0505fd9ea553b36bbd012..f891828f700afca41c7cb0ea6606c7943efcac1f 100644 (file)
@@ -859,7 +859,7 @@ error:
  * Returns 0 on success, < 0 on error
  */
 static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
-               uint64_t max_stream_size, struct lttng_consumer_local_data *ctx)
+               uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned use_relayd = 0;
@@ -941,12 +941,13 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
 
                /*
                 * The original value is sent back if max stream size is larger than
-                * the possible size of the snapshot. Also, we asume that the session
+                * the possible size of the snapshot. Also, we assume that the session
                 * daemon should never send a maximum stream size that is lower than
                 * subbuffer size.
                 */
-               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
-                               produced_pos, max_stream_size);
+               consumed_pos = consumer_get_consume_start_pos(consumed_pos,
+                               produced_pos, nb_packets_per_stream,
+                               stream->max_sb_size);
 
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
@@ -1489,7 +1490,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret = snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
                                        msg.u.snapshot_channel.relayd_id,
-                                       msg.u.snapshot_channel.max_stream_size,
+                                       msg.u.snapshot_channel.nb_packets_per_stream,
                                        ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
diff --git a/src/lib/lttng-ctl/filter/align.h b/src/lib/lttng-ctl/filter/align.h
deleted file mode 100644 (file)
index fe32673..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-#ifndef _LTTNG_ALIGN_H
-#define _LTTNG_ALIGN_H
-
-/*
- * align.h
- *
- * (C) Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- */
-
-#include "bug.h"
-#include <unistd.h>
-#include <limits.h>
-
-#ifndef PAGE_SIZE      /* Cygwin limits.h defines its own PAGE_SIZE */
-#define PAGE_SIZE              sysconf(_SC_PAGE_SIZE)
-#endif
-
-#define PAGE_MASK              (~(PAGE_SIZE - 1))
-#define __ALIGN_MASK(v, mask)  (((v) + (mask)) & ~(mask))
-#define ALIGN(v, align)                __ALIGN_MASK(v, (__typeof__(v)) (align) - 1)
-#define PAGE_ALIGN(addr)       ALIGN(addr, PAGE_SIZE)
-
-/**
- * offset_align - Calculate the offset needed to align an object on its natural
- *                alignment towards higher addresses.
- * @align_drift:  object offset from an "alignment"-aligned address.
- * @alignment:    natural object alignment. Must be non-zero, power of 2.
- *
- * Returns the offset that must be added to align towards higher
- * addresses.
- */
-#define offset_align(align_drift, alignment)                                  \
-       ({                                                                     \
-               LTTNG_BUILD_RUNTIME_BUG_ON((alignment) == 0                    \
-                                  || ((alignment) & ((alignment) - 1)));      \
-               (((alignment) - (align_drift)) & ((alignment) - 1));           \
-       })
-
-/**
- * offset_align_floor - Calculate the offset needed to align an object
- *                      on its natural alignment towards lower addresses.
- * @align_drift:  object offset from an "alignment"-aligned address.
- * @alignment:    natural object alignment. Must be non-zero, power of 2.
- *
- * Returns the offset that must be substracted to align towards lower addresses.
- */
-#define offset_align_floor(align_drift, alignment)                            \
-       ({                                                                     \
-               LTTNG_BUILD_RUNTIME_BUG_ON((alignment) == 0                    \
-                                  || ((alignment) & ((alignment) - 1)));      \
-               (((align_drift) - (alignment)) & ((alignment) - 1);            \
-       })
-
-#endif /* _LTTNG_ALIGN_H */
diff --git a/src/lib/lttng-ctl/filter/bug.h b/src/lib/lttng-ctl/filter/bug.h
deleted file mode 100644 (file)
index 9368c08..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-#ifndef _LTTNG_BUG_H
-#define _LTTNG_BUG_H
-
-/*
- * lttng/bug.h
- *
- * (C) Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- */
-
-#include <urcu/compiler.h>
-#include <stdio.h>
-#include <stdlib.h>
-
-#define LTTNG_BUG_ON(condition)                                                \
-       do {                                                            \
-               if (caa_unlikely(condition)) {                          \
-                       fprintf(stderr,                                 \
-                               "LTTng BUG in file %s, line %d.\n",     \
-                               __FILE__, __LINE__);                    \
-                       exit(EXIT_FAILURE);                             \
-               }                                                       \
-       } while (0)
-
-#define LTTNG_BUILD_BUG_ON(condition)                                  \
-       ((void) sizeof(char[-!!(condition)]))
-
-/**
- * LTTNG_BUILD_RUNTIME_BUG_ON - check condition at build (if constant) or runtime
- * @condition: the condition which should be false.
- *
- * If the condition is a constant and true, the compiler will generate a build
- * error. If the condition is not constant, a BUG will be triggered at runtime
- * if the condition is ever true. If the condition is constant and false, no
- * code is emitted.
- */
-#define LTTNG_BUILD_RUNTIME_BUG_ON(condition)                  \
-       do {                                                    \
-               if (__builtin_constant_p(condition))            \
-                       LTTNG_BUILD_BUG_ON(condition);          \
-               else                                            \
-                       LTTNG_BUG_ON(condition);                \
-       } while (0)
-
-#endif
index 1cf7cb5c3cf0324048c7d360980564494eb1450e..bb90ab9ffbc6d3c8ea5d5e176d9737223cb3217f 100644 (file)
@@ -22,7 +22,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <errno.h>
-#include "align.h"
+#include "common/align.h"
 #include "filter-bytecode.h"
 #include "filter-ir.h"
 #include "filter-ast.h"
This page took 0.05347 seconds and 4 git commands to generate.