/* Keep stream reference when creating metadata. */
if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
channel->metadata_stream = stream;
- stream->ust_metadata_poll_pipe[0] = ust_metadata_pipe[0];
- stream->ust_metadata_poll_pipe[1] = ust_metadata_pipe[1];
+ if (channel->monitor) {
+ /* Set metadata poll pipe if we created one */
+ memcpy(stream->ust_metadata_poll_pipe,
+ ust_metadata_pipe,
+ sizeof(ust_metadata_pipe));
+ }
}
}
* Returns 0 on success, < 0 on error
*/
static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
- uint64_t max_stream_size, struct lttng_consumer_local_data *ctx)
+ uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx)
{
int ret;
unsigned use_relayd = 0;
/*
* The original value is sent back if max stream size is larger than
- * the possible size of the snapshot. Also, we asume that the session
+ * the possible size of the snapshot. Also, we assume that the session
* daemon should never send a maximum stream size that is lower than
* subbuffer size.
*/
- consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
- produced_pos, max_stream_size);
+ consumed_pos = consumer_get_consume_start_pos(consumed_pos,
+ produced_pos, nb_packets_per_stream,
+ stream->max_sb_size);
while (consumed_pos < produced_pos) {
ssize_t read_len;
ret = snapshot_channel(msg.u.snapshot_channel.key,
msg.u.snapshot_channel.pathname,
msg.u.snapshot_channel.relayd_id,
- msg.u.snapshot_channel.max_stream_size,
+ msg.u.snapshot_channel.nb_packets_per_stream,
ctx);
if (ret < 0) {
ERR("Snapshot channel failed");
goto end;
}
- ret = ustctl_put_next_subbuf(ustream);
+ ret = ustctl_put_subbuf(ustream);
assert(!ret);
/* This stream still has data. Flag it and wake up the data thread. */
*/
DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
contiguous, pushed);
- assert(((int64_t) contiguous - pushed) >= 0);
+ assert(((int64_t) (contiguous - pushed)) >= 0);
if ((contiguous != pushed) ||
(((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
ret = 1; /* Data is pending */
pthread_mutex_unlock(&ctx->metadata_socket_lock);
return ret;
}
+
+/*
+ * Return the ustctl call for the get stream id.
+ */
+int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
+ uint64_t *stream_id)
+{
+ assert(stream);
+ assert(stream_id);
+
+ return ustctl_get_stream_id(stream->ustream, stream_id);
+}