Fix: UST should not generate packet at destroy after stop
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 18 May 2016 18:04:11 +0000 (14:04 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 19 May 2016 03:53:08 +0000 (23:53 -0400)
In the following scenario:
- create, enable events (ust),
- start
- ...
- stop (await for data_pending to complete)
- destroy
- rm the trace directory

We would expect that the "rm" operation would not conflict with the
consumer daemon trying to output data into the trace files, since the
"stop" operation ensured that there was no data_pending.

However, the "destroy" operation currently generates an extra packet
after the data_pending check (the "on_stream_hangup"). This causes the
consumer daemon to try to perform trace file rotation concurrently with
the trace directory removal in the scenario above, which triggers
errors. The main reason why this empty packet is generated by "destroy"
is to deal with trace start/stop scenario which would otherwise generate
a completely empty stream.

Therefore, introduce the concept of a "quiescent stream". It is
initialized at false on stream creation (first packet is empty). When
tracing is started, it is set to false (for cases of start/stop/start).
When tracing is stopped, if the stream is not quiescent, perform a
"final" flush (which will generate an empty packet if the current packet
was empty), and set quiescent to true.  On "destroy" stream and on
application hangup: if the stream is not quiescent, perform a "final"
flush, and set the quiescent state to true.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/ust-app.c
src/common/consumer/consumer.h
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index 934af39bd3f6028b69138765b39c22401e6dbfbe..6ee3975792bb061b46035d6d7b5923af1ab6fd0f 100644 (file)
@@ -1191,6 +1191,38 @@ end:
        return ret;
 }
 
        return ret;
 }
 
+/*
+ * Send a clear quiescent command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(socket);
+
+       DBG2("Consumer clear quiescent channel key %" PRIu64, key);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL;
+       msg.u.clear_quiescent_channel.key = key;
+
+       pthread_mutex_lock(socket->lock);
+       health_code_update();
+
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto end;
+       }
+
+end:
+       health_code_update();
+       pthread_mutex_unlock(socket->lock);
+       return ret;
+}
+
 /*
  * Send a close metadata command to consumer using the given channel key.
  * Called with registry lock held.
 /*
  * Send a close metadata command to consumer using the given channel key.
  * Called with registry lock held.
index 75a40f8ae61c6a6837703902e13ad0ae8278272f..08b57eb73b6a7e932e57a8b1dae0ffea2a9bfc8f 100644 (file)
@@ -284,6 +284,7 @@ int consumer_push_metadata(struct consumer_socket *socket,
                uint64_t metadata_key, char *metadata_str, size_t len,
                size_t target_offset, uint64_t version);
 int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
                uint64_t metadata_key, char *metadata_str, size_t len,
                size_t target_offset, uint64_t version);
 int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
+int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key);
 int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
                struct consumer_output *consumer, uint64_t *discarded);
 int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
 int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
                struct consumer_output *consumer, uint64_t *discarded);
 int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
index 17e33a1087dc837593520a2eeb3249267a5096f1..f30df2097fd36aea2b03d2f30f96d791226dc1a9 100644 (file)
@@ -4647,6 +4647,155 @@ int ust_app_flush_session(struct ltt_ust_session *usess)
        return ret;
 }
 
        return ret;
 }
 
+static
+int ust_app_clear_quiescent_app_session(struct ust_app *app,
+               struct ust_app_session *ua_sess)
+{
+       int ret = 0;
+       struct lttng_ht_iter iter;
+       struct ust_app_channel *ua_chan;
+       struct consumer_socket *socket;
+
+       DBG("Clearing stream quiescent state for ust app pid %d", app->pid);
+
+       rcu_read_lock();
+
+       if (!app->compatible) {
+               goto end_not_compatible;
+       }
+
+       pthread_mutex_lock(&ua_sess->lock);
+
+       if (ua_sess->deleted) {
+               goto end_unlock;
+       }
+
+       health_code_update();
+
+       socket = consumer_find_socket_by_bitness(app->bits_per_long,
+                       ua_sess->consumer);
+       if (!socket) {
+               ERR("Failed to find consumer (%" PRIu32 ") socket",
+                               app->bits_per_long);
+               ret = -1;
+               goto end_unlock;
+       }
+
+       /* Clear quiescent state. */
+       switch (ua_sess->buffer_type) {
+       case LTTNG_BUFFER_PER_PID:
+               cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter,
+                               ua_chan, node.node) {
+                       health_code_update();
+                       ret = consumer_clear_quiescent_channel(socket,
+                                       ua_chan->key);
+                       if (ret) {
+                               ERR("Error clearing quiescent state for consumer channel");
+                               ret = -1;
+                               continue;
+                       }
+               }
+               break;
+       case LTTNG_BUFFER_PER_UID:
+       default:
+               assert(0);
+               ret = -1;
+               break;
+       }
+
+       health_code_update();
+
+end_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
+
+end_not_compatible:
+       rcu_read_unlock();
+       health_code_update();
+       return ret;
+}
+
+/*
+ * Clear quiescent state in each stream for all applications for a
+ * specific UST session.
+ * Called with UST session lock held.
+ */
+static
+int ust_app_clear_quiescent_session(struct ltt_ust_session *usess)
+
+{
+       int ret = 0;
+
+       DBG("Clearing stream quiescent state for all ust apps");
+
+       rcu_read_lock();
+
+       switch (usess->buffer_type) {
+       case LTTNG_BUFFER_PER_UID:
+       {
+               struct lttng_ht_iter iter;
+               struct buffer_reg_uid *reg;
+
+               /*
+                * Clear quiescent for all per UID buffers associated to
+                * that session.
+                */
+               cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+                       struct consumer_socket *socket;
+                       struct buffer_reg_channel *reg_chan;
+
+                       /* Get associated consumer socket.*/
+                       socket = consumer_find_socket_by_bitness(
+                                       reg->bits_per_long, usess->consumer);
+                       if (!socket) {
+                               /*
+                                * Ignore request if no consumer is found for
+                                * the session.
+                                */
+                               continue;
+                       }
+
+                       cds_lfht_for_each_entry(reg->registry->channels->ht,
+                                       &iter.iter, reg_chan, node.node) {
+                               /*
+                                * The following call will print error values so
+                                * the return code is of little importance
+                                * because whatever happens, we have to try them
+                                * all.
+                                */
+                               (void) consumer_clear_quiescent_channel(socket,
+                                               reg_chan->consumer_key);
+                       }
+               }
+               break;
+       }
+       case LTTNG_BUFFER_PER_PID:
+       {
+               struct ust_app_session *ua_sess;
+               struct lttng_ht_iter iter;
+               struct ust_app *app;
+
+               cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app,
+                               pid_n.node) {
+                       ua_sess = lookup_session_by_app(usess, app);
+                       if (ua_sess == NULL) {
+                               continue;
+                       }
+                       (void) ust_app_clear_quiescent_app_session(app,
+                                       ua_sess);
+               }
+               break;
+       }
+       default:
+               ret = -1;
+               assert(0);
+               break;
+       }
+
+       rcu_read_unlock();
+       health_code_update();
+       return ret;
+}
+
 /*
  * Destroy a specific UST session in apps.
  */
 /*
  * Destroy a specific UST session in apps.
  */
@@ -4705,6 +4854,14 @@ int ust_app_start_trace_all(struct ltt_ust_session *usess)
 
        rcu_read_lock();
 
 
        rcu_read_lock();
 
+       /*
+        * In a start-stop-start use-case, we need to clear the quiescent state
+        * of each channel set by the prior stop command, thus ensuring that a
+        * following stop or destroy is sure to grab a timestamp_end near those
+        * operations, even if the packet is empty.
+        */
+       (void) ust_app_clear_quiescent_session(usess);
+
        cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
                ret = ust_app_start_trace(usess, app);
                if (ret < 0) {
        cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
                ret = ust_app_start_trace(usess, app);
                if (ret < 0) {
index 59764e105c36ef086c77dec6508312e7d8b54d58..db5cbf6a01004710de022fae90bffff8e31bb53e 100644 (file)
@@ -60,6 +60,7 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_STREAMS_SENT,
        LTTNG_CONSUMER_DISCARDED_EVENTS,
        LTTNG_CONSUMER_LOST_PACKETS,
        LTTNG_CONSUMER_STREAMS_SENT,
        LTTNG_CONSUMER_DISCARDED_EVENTS,
        LTTNG_CONSUMER_LOST_PACKETS,
+       LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
 };
 
 /* State of each fd in consumer */
 };
 
 /* State of each fd in consumer */
@@ -249,6 +250,25 @@ struct lttng_consumer_stream {
        int data_read;
        int hangup_flush_done;
 
        int data_read;
        int hangup_flush_done;
 
+       /*
+        * Whether the stream is in a "complete" state (e.g. it does not have a
+        * partially written sub-buffer.
+        *
+        * Initialized to "false" on stream creation (first packet is empty).
+        *
+        * The various transitions of the quiescent state are:
+        *     - On "start" tracing: set to false, since the stream is not
+        *       "complete".
+        *     - On "stop" tracing: if !quiescent -> flush FINAL (update
+        *       timestamp_end), and set to true; the stream has entered a
+        *       complete/quiescent state.
+        *     - On "destroy" or stream/application hang-up: if !quiescent ->
+        *       flush FINAL, and set to true.
+        *
+        * NOTE: Update and read are protected by the stream lock.
+        */
+       bool quiescent;
+
        /*
         * metadata_timer_lock protects flags waiting_on_metadata and
         * missed_metadata_flush.
        /*
         * metadata_timer_lock protects flags waiting_on_metadata and
         * missed_metadata_flush.
index e7fd69ddd4388f6e288a8cacdf12fd9924f36b4d..09838655b6f64f3deb749d92e0c7ba185491cf4a 100644 (file)
@@ -500,6 +500,9 @@ struct lttcomm_consumer_msg {
                struct {
                        uint64_t key;   /* Channel key. */
                } LTTNG_PACKED flush_channel;
                struct {
                        uint64_t key;   /* Channel key. */
                } LTTNG_PACKED flush_channel;
+               struct {
+                       uint64_t key;   /* Channel key. */
+               } LTTNG_PACKED clear_quiescent_channel;
                struct {
                        char pathname[PATH_MAX];
                        /* Indicate if the snapshot goes on the relayd or locally. */
                struct {
                        char pathname[PATH_MAX];
                        /* Indicate if the snapshot goes on the relayd or locally. */
index 4d1e7f155abd0af1474b32eeb9fdc49905127f4c..a66f305cd02e433f41bf31d0591c0e545d524a1a 100644 (file)
@@ -767,7 +767,54 @@ static int flush_channel(uint64_t chan_key)
 
                health_code_update();
 
 
                health_code_update();
 
-               ustctl_flush_buffer(stream->ustream, 1);
+               pthread_mutex_lock(&stream->lock);
+               if (!stream->quiescent) {
+                       ustctl_flush_buffer(stream->ustream, 0);
+                       stream->quiescent = true;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+error:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Clear quiescent state from channel's streams using the given key to
+ * retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int clear_quiescent_channel(uint64_t chan_key)
+{
+       int ret = 0;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht *ht;
+       struct lttng_ht_iter iter;
+
+       DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
+
+       rcu_read_lock();
+       channel = consumer_find_channel(chan_key);
+       if (!channel) {
+               ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       /* For each stream of the channel id, clear quiescent state. */
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+                       &channel->key, &iter.iter, stream, node_channel_id.node) {
+
+               health_code_update();
+
+               pthread_mutex_lock(&stream->lock);
+               stream->quiescent = false;
+               pthread_mutex_unlock(&stream->lock);
        }
 error:
        rcu_read_unlock();
        }
 error:
        rcu_read_unlock();
@@ -1582,6 +1629,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                goto end_msg_sessiond;
        }
 
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
+       {
+               int ret;
+
+               ret = clear_quiescent_channel(
+                               msg.u.clear_quiescent_channel.key);
+               if (ret != 0) {
+                       ret_code = ret;
+               }
+
+               goto end_msg_sessiond;
+       }
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
@@ -1945,14 +2004,19 @@ int lttng_ustconsumer_get_sequence_number(
 }
 
 /*
 }
 
 /*
- * Called when the stream signal the consumer that it has hang up.
+ * Called when the stream signals the consumer that it has hung up.
  */
 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 {
        assert(stream);
        assert(stream->ustream);
 
  */
 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 {
        assert(stream);
        assert(stream->ustream);
 
-       ustctl_flush_buffer(stream->ustream, 0);
+       pthread_mutex_lock(&stream->lock);
+       if (!stream->quiescent) {
+               ustctl_flush_buffer(stream->ustream, 0);
+               stream->quiescent = true;
+       }
+       pthread_mutex_unlock(&stream->lock);
        stream->hangup_flush_done = 1;
 }
 
        stream->hangup_flush_done = 1;
 }
 
This page took 0.040971 seconds and 4 git commands to generate.