case LTTNG_CONSUMER_ADD_STREAM:
{
struct lttng_consumer_stream *new_stream;
- int fds[2];
+ int fds[2], stream_pipe;
size_t nb_fd = 2;
struct consumer_relayd_sock_pair *relayd = NULL;
int alloc_ret = 0;
msg.u.stream.gid,
msg.u.stream.net_index,
msg.u.stream.metadata_flag,
+ msg.u.stream.session_id,
&alloc_ret);
if (new_stream == NULL) {
switch (alloc_ret) {
}
}
- /* Send stream to the metadata thread */
+ /* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
- do {
- ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
- sizeof(new_stream));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write metadata pipe");
- consumer_del_metadata_stream(new_stream, NULL);
- goto end_nosignal;
- }
+ stream_pipe = ctx->consumer_metadata_pipe[1];
} else {
- do {
- ret = write(ctx->consumer_poll_pipe[1], &new_stream,
- sizeof(new_stream));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write data pipe");
- consumer_del_stream(new_stream, NULL);
- goto end_nosignal;
- }
+ stream_pipe = ctx->consumer_data_pipe[1];
}
- DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64,
+ do {
+ ret = write(stream_pipe, &new_stream, sizeof(new_stream));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("Consumer write %s stream to pipe %d",
+ new_stream->metadata_flag ? "metadata" : "data",
+ stream_pipe);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
+ }
+
+ DBG("UST consumer ADD_STREAM %s (%d,%d) with relayd id %" PRIu64,
msg.u.stream.path_name, fds[0], fds[1],
new_stream->relayd_stream_id);
break;
{
rcu_read_unlock();
return -ENOSYS;
-#if 0
- if (ctx->on_update_stream != NULL) {
- ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
- if (ret == 0) {
- consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
- } else if (ret < 0) {
- goto end;
- }
- } else {
- consumer_change_stream_state(msg.u.stream.stream_key,
- msg.u.stream.state);
+ }
+ case LTTNG_CONSUMER_DATA_AVAILABLE:
+ {
+ int32_t ret;
+ uint64_t id = msg.u.data_available.session_id;
+
+ DBG("UST consumer data available command for id %" PRIu64, id);
+
+ ret = consumer_data_available(id);
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send data available ret code");
}
break;
-#endif
}
default:
break;
error:
return ret;
}
+
+/*
+ * Check if data is still being extracted from the buffers for a specific
+ * stream. Consumer data lock MUST be acquired before calling this function.
+ *
+ * Return 0 if the traced data are still getting read else 1 meaning that the
+ * data is available for trace viewer reading.
+ */
+int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+
+ DBG("UST consumer checking data availability");
+
+ /*
+ * Try to lock the stream mutex. On failure, we know that the stream is
+ * being used else where hence there is data still being extracted.
+ */
+ ret = pthread_mutex_trylock(&stream->lock);
+ if (ret == EBUSY) {
+ goto data_not_available;
+ }
+ /* The stream is now locked so we can do our ustctl calls */
+
+ ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf);
+ if (ret == 0) {
+ /* There is still data so let's put back this subbuffer. */
+ ret = ustctl_put_subbuf(stream->chan->handle, stream->buf);
+ assert(ret == 0);
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_available;
+ }
+
+ /* Data is available to be read for this stream. */
+ pthread_mutex_unlock(&stream->lock);
+ return 1;
+
+data_not_available:
+ return 0;
+}