Fix: UST should not generate packet at destroy after stop
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 18 May 2016 18:04:11 +0000 (14:04 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 20 May 2016 20:40:18 +0000 (16:40 -0400)
In the following scenario:
- create, enable events (ust),
- start
- ...
- stop (await for data_pending to complete)
- destroy
- rm the trace directory

We would expect that the "rm" operation would not conflict with the
consumer daemon trying to output data into the trace files, since the
"stop" operation ensured that there was no data_pending.

However, the "destroy" operation currently generates an extra packet
after the data_pending check (the "on_stream_hangup"). This causes the
consumer daemon to try to perform trace file rotation concurrently with
the trace directory removal in the scenario above, which triggers
errors. The main reason why this empty packet is generated by "destroy"
is to deal with trace start/stop scenario which would otherwise generate
a completely empty stream.

Therefore, introduce the concept of a "quiescent stream". It is
initialized at false on stream creation (first packet is empty). When
tracing is started, it is set to false (for cases of start/stop/start).
When tracing is stopped, if the stream is not quiescent, perform a
"final" flush (which will generate an empty packet if the current packet
was empty), and set quiescent to true.  On "destroy" stream and on
application hangup: if the stream is not quiescent, perform a "final"
flush, and set the quiescent state to true.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/ust-app.c
src/common/consumer.h
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index f6f321bf9f07d7fe25398b4ab8f6ef58780f0ffa..674e1ea442f959d5e69d18819a5b3f8aefc48873 100644 (file)
@@ -1179,6 +1179,38 @@ end:
        return ret;
 }
 
+/*
+ * Send a clear quiescent command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(socket);
+
+       DBG2("Consumer clear quiescent channel key %" PRIu64, key);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL;
+       msg.u.clear_quiescent_channel.key = key;
+
+       pthread_mutex_lock(socket->lock);
+       health_code_update();
+
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto end;
+       }
+
+end:
+       health_code_update();
+       pthread_mutex_unlock(socket->lock);
+       return ret;
+}
+
 /*
  * Send a close metadata command to consumer using the given channel key.
  * Called with registry lock held.
index ebff43f9db93cc38059b892ece881e8ceb154412..bb9c63228d20eb5e9be221804717a2c29bc23478 100644 (file)
@@ -279,6 +279,7 @@ int consumer_push_metadata(struct consumer_socket *socket,
                uint64_t metadata_key, char *metadata_str, size_t len,
                size_t target_offset);
 int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
+int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key);
 
 /* Snapshot command. */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
index df52f491f0912ac5f6bfb7c6bfbf1e4e9df09c7e..73d1177be5643cec31e9853bac8c375beb894c9f 100644 (file)
@@ -4405,6 +4405,155 @@ int ust_app_flush_session(struct ltt_ust_session *usess)
        return ret;
 }
 
+static
+int ust_app_clear_quiescent_app_session(struct ust_app *app,
+               struct ust_app_session *ua_sess)
+{
+       int ret = 0;
+       struct lttng_ht_iter iter;
+       struct ust_app_channel *ua_chan;
+       struct consumer_socket *socket;
+
+       DBG("Clearing stream quiescent state for ust app pid %d", app->pid);
+
+       rcu_read_lock();
+
+       if (!app->compatible) {
+               goto end_not_compatible;
+       }
+
+       pthread_mutex_lock(&ua_sess->lock);
+
+       if (ua_sess->deleted) {
+               goto end_unlock;
+       }
+
+       health_code_update();
+
+       socket = consumer_find_socket_by_bitness(app->bits_per_long,
+                       ua_sess->consumer);
+       if (!socket) {
+               ERR("Failed to find consumer (%" PRIu32 ") socket",
+                               app->bits_per_long);
+               ret = -1;
+               goto end_unlock;
+       }
+
+       /* Clear quiescent state. */
+       switch (ua_sess->buffer_type) {
+       case LTTNG_BUFFER_PER_PID:
+               cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter,
+                               ua_chan, node.node) {
+                       health_code_update();
+                       ret = consumer_clear_quiescent_channel(socket,
+                                       ua_chan->key);
+                       if (ret) {
+                               ERR("Error clearing quiescent state for consumer channel");
+                               ret = -1;
+                               continue;
+                       }
+               }
+               break;
+       case LTTNG_BUFFER_PER_UID:
+       default:
+               assert(0);
+               ret = -1;
+               break;
+       }
+
+       health_code_update();
+
+end_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
+
+end_not_compatible:
+       rcu_read_unlock();
+       health_code_update();
+       return ret;
+}
+
+/*
+ * Clear quiescent state in each stream for all applications for a
+ * specific UST session.
+ * Called with UST session lock held.
+ */
+static
+int ust_app_clear_quiescent_session(struct ltt_ust_session *usess)
+
+{
+       int ret = 0;
+
+       DBG("Clearing stream quiescent state for all ust apps");
+
+       rcu_read_lock();
+
+       switch (usess->buffer_type) {
+       case LTTNG_BUFFER_PER_UID:
+       {
+               struct lttng_ht_iter iter;
+               struct buffer_reg_uid *reg;
+
+               /*
+                * Clear quiescent for all per UID buffers associated to
+                * that session.
+                */
+               cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+                       struct consumer_socket *socket;
+                       struct buffer_reg_channel *reg_chan;
+
+                       /* Get associated consumer socket.*/
+                       socket = consumer_find_socket_by_bitness(
+                                       reg->bits_per_long, usess->consumer);
+                       if (!socket) {
+                               /*
+                                * Ignore request if no consumer is found for
+                                * the session.
+                                */
+                               continue;
+                       }
+
+                       cds_lfht_for_each_entry(reg->registry->channels->ht,
+                                       &iter.iter, reg_chan, node.node) {
+                               /*
+                                * The following call will print error values so
+                                * the return code is of little importance
+                                * because whatever happens, we have to try them
+                                * all.
+                                */
+                               (void) consumer_clear_quiescent_channel(socket,
+                                               reg_chan->consumer_key);
+                       }
+               }
+               break;
+       }
+       case LTTNG_BUFFER_PER_PID:
+       {
+               struct ust_app_session *ua_sess;
+               struct lttng_ht_iter iter;
+               struct ust_app *app;
+
+               cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app,
+                               pid_n.node) {
+                       ua_sess = lookup_session_by_app(usess, app);
+                       if (ua_sess == NULL) {
+                               continue;
+                       }
+                       (void) ust_app_clear_quiescent_app_session(app,
+                                       ua_sess);
+               }
+               break;
+       }
+       default:
+               ret = -1;
+               assert(0);
+               break;
+       }
+
+       rcu_read_unlock();
+       health_code_update();
+       return ret;
+}
+
 /*
  * Destroy a specific UST session in apps.
  */
@@ -4463,6 +4612,14 @@ int ust_app_start_trace_all(struct ltt_ust_session *usess)
 
        rcu_read_lock();
 
+       /*
+        * In a start-stop-start use-case, we need to clear the quiescent state
+        * of each channel set by the prior stop command, thus ensuring that a
+        * following stop or destroy is sure to grab a timestamp_end near those
+        * operations, even if the packet is empty.
+        */
+       (void) ust_app_clear_quiescent_session(usess);
+
        cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
                ret = ust_app_start_trace(usess, app);
                if (ret < 0) {
index e50835e019f27176143ba857a572b52415c919f4..0f89d8eedc81dccc6548e12970b5a91b21dc5781 100644 (file)
@@ -58,6 +58,7 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_SNAPSHOT_CHANNEL,
        LTTNG_CONSUMER_SNAPSHOT_METADATA,
        LTTNG_CONSUMER_STREAMS_SENT,
+       LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
 };
 
 /* State of each fd in consumer */
@@ -238,6 +239,25 @@ struct lttng_consumer_stream {
        int data_read;
        int hangup_flush_done;
 
+       /*
+        * Whether the stream is in a "complete" state (e.g. it does not have a
+        * partially written sub-buffer.
+        *
+        * Initialized to "false" on stream creation (first packet is empty).
+        *
+        * The various transitions of the quiescent state are:
+        *     - On "start" tracing: set to false, since the stream is not
+        *       "complete".
+        *     - On "stop" tracing: if !quiescent -> flush FINAL (update
+        *       timestamp_end), and set to true; the stream has entered a
+        *       complete/quiescent state.
+        *     - On "destroy" or stream/application hang-up: if !quiescent ->
+        *       flush FINAL, and set to true.
+        *
+        * NOTE: Update and read are protected by the stream lock.
+        */
+       bool quiescent;
+
        /*
         * metadata_timer_lock protects flags waiting_on_metadata and
         * missed_metadata_flush.
index 1426fcf203548dfe8c95ebeeae6050662ca917a1..8f2c9cb00ea9d993c1323b1bb050d8ecce473f3d 100644 (file)
@@ -445,6 +445,9 @@ struct lttcomm_consumer_msg {
                struct {
                        uint64_t key;   /* Channel key. */
                } LTTNG_PACKED flush_channel;
+               struct {
+                       uint64_t key;   /* Channel key. */
+               } LTTNG_PACKED clear_quiescent_channel;
                struct {
                        char pathname[PATH_MAX];
                        /* Indicate if the snapshot goes on the relayd or locally. */
index 152455ca64fa7e938834c01d697bb3b97e6198a8..4bb4f8d728794123dc6289e2253371e53b869ae5 100644 (file)
@@ -629,7 +629,54 @@ static int flush_channel(uint64_t chan_key)
 
                health_code_update();
 
-               ustctl_flush_buffer(stream->ustream, 1);
+               pthread_mutex_lock(&stream->lock);
+               if (!stream->quiescent) {
+                       ustctl_flush_buffer(stream->ustream, 0);
+                       stream->quiescent = true;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+error:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Clear quiescent state from channel's streams using the given key to
+ * retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int clear_quiescent_channel(uint64_t chan_key)
+{
+       int ret = 0;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht *ht;
+       struct lttng_ht_iter iter;
+
+       DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
+
+       rcu_read_lock();
+       channel = consumer_find_channel(chan_key);
+       if (!channel) {
+               ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       /* For each stream of the channel id, clear quiescent state. */
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+                       &channel->key, &iter.iter, stream, node_channel_id.node) {
+
+               health_code_update();
+
+               pthread_mutex_lock(&stream->lock);
+               stream->quiescent = false;
+               pthread_mutex_unlock(&stream->lock);
        }
 error:
        rcu_read_unlock();
@@ -1419,6 +1466,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
+       {
+               int ret;
+
+               ret = clear_quiescent_channel(
+                               msg.u.clear_quiescent_channel.key);
+               if (ret != 0) {
+                       ret_code = ret;
+               }
+
+               goto end_msg_sessiond;
+       }
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
@@ -1669,14 +1728,19 @@ int lttng_ustconsumer_get_current_timestamp(
 }
 
 /*
- * Called when the stream signal the consumer that it has hang up.
+ * Called when the stream signals the consumer that it has hung up.
  */
 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 {
        assert(stream);
        assert(stream->ustream);
 
-       ustctl_flush_buffer(stream->ustream, 0);
+       pthread_mutex_lock(&stream->lock);
+       if (!stream->quiescent) {
+               ustctl_flush_buffer(stream->ustream, 0);
+               stream->quiescent = true;
+       }
+       pthread_mutex_unlock(&stream->lock);
        stream->hangup_flush_done = 1;
 }
 
This page took 0.033524 seconds and 4 git commands to generate.