health_code_update();
- ustctl_flush_buffer(stream->ustream, 1);
+ pthread_mutex_lock(&stream->lock);
+ if (!stream->quiescent) {
+ ustctl_flush_buffer(stream->ustream, 0);
+ stream->quiescent = true;
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+error:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Clear quiescent state from channel's streams using the given key to
+ * retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int clear_quiescent_channel(uint64_t chan_key)
+{
+ int ret = 0;
+ struct lttng_consumer_channel *channel;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht *ht;
+ struct lttng_ht_iter iter;
+
+ DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
+
+ rcu_read_lock();
+ channel = consumer_find_channel(chan_key);
+ if (!channel) {
+ ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
+ ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto error;
+ }
+
+ ht = consumer_data.stream_per_chan_id_ht;
+
+ /* For each stream of the channel id, clear quiescent state. */
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+ &channel->key, &iter.iter, stream, node_channel_id.node) {
+
+ health_code_update();
+
+ pthread_mutex_lock(&stream->lock);
+ stream->quiescent = false;
+ pthread_mutex_unlock(&stream->lock);
}
error:
rcu_read_unlock();
}
}
- ustctl_flush_buffer(stream->ustream, 1);
+ /*
+ * If tracing is active, we want to perform a "full" buffer flush.
+ * Else, if quiescent, it has already been done by the prior stop.
+ */
+ if (!stream->quiescent) {
+ ustctl_flush_buffer(stream->ustream, 0);
+ }
ret = lttng_ustconsumer_take_snapshot(stream);
if (ret < 0) {
goto end_msg_sessiond;
}
+ case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
+ {
+ int ret;
+
+ ret = clear_quiescent_channel(
+ msg.u.clear_quiescent_channel.key);
+ if (ret != 0) {
+ ret_code = ret;
+ }
+
+ goto end_msg_sessiond;
+ }
case LTTNG_CONSUMER_PUSH_METADATA:
{
int ret;
}
case LTTNG_CONSUMER_DISCARDED_EVENTS:
{
- uint64_t ret;
+ int ret = 0;
+ uint64_t discarded_events;
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
* found (no events are dropped if the channel is not yet in
* use).
*/
- ret = 0;
+ discarded_events = 0;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&id, lttng_ht_seed),
ht->match_fct, &id,
&iter.iter, stream, node_session_id.node) {
if (stream->chan->key == key) {
- ret = stream->chan->discarded_events;
+ discarded_events = stream->chan->discarded_events;
break;
}
}
health_code_update();
/* Send back returned value to session daemon */
- ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ ret = lttcomm_send_unix_sock(sock, &discarded_events, sizeof(discarded_events));
if (ret < 0) {
PERROR("send discarded events");
goto error_fatal;
}
/*
- * Called when the stream signal the consumer that it has hang up.
+ * Called when the stream signals the consumer that it has hung up.
*/
void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
{
assert(stream);
assert(stream->ustream);
- ustctl_flush_buffer(stream->ustream, 0);
+ pthread_mutex_lock(&stream->lock);
+ if (!stream->quiescent) {
+ ustctl_flush_buffer(stream->ustream, 0);
+ stream->quiescent = true;
+ }
+ pthread_mutex_unlock(&stream->lock);
stream->hangup_flush_done = 1;
}
}
}
}
- /* Try to rmdir all directories under shm_path root. */
- if (chan->root_shm_path[0]) {
- (void) run_as_recursive_rmdir(chan->root_shm_path,
- chan->uid, chan->gid);
- }
}
void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
consumer_metadata_cache_destroy(chan);
ustctl_destroy_channel(chan->uchan);
+ /* Try to rmdir all directories under shm_path root. */
+ if (chan->root_shm_path[0]) {
+ (void) run_as_recursive_rmdir(chan->root_shm_path,
+ chan->uid, chan->gid);
+ }
free(chan->stream_fds);
}
}
if (discarded < stream->last_discarded_events) {
/*
- * Overflow has occured. We assume only one wrap-around
- * has occured.
+ * Overflow has occurred. We assume only one wrap-around
+ * has occurred.
*/
stream->chan->discarded_events +=
(1ULL << (CAA_BITS_PER_LONG - 1)) -