DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
fds[0], fds[1]);
- new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
+ assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
+ new_stream = consumer_allocate_stream(msg.u.channel.channel_key,
msg.u.stream.stream_key,
fds[0], fds[1],
msg.u.stream.state,
msg.u.stream.mmap_len,
msg.u.stream.output,
- msg.u.stream.path_name);
+ msg.u.stream.path_name,
+ msg.u.stream.uid,
+ msg.u.stream.gid);
if (new_stream == NULL) {
lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
goto end;
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
+ return -ENOSYS;
+#if 0
if (ctx->on_update_stream != NULL) {
ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
if (ret == 0) {
consumer_change_stream_state(msg.u.stream.stream_key,
msg.u.stream.state);
}
+#endif
break;
}
default:
return -ENOMEM;
}
/*
- * The channel shm and wait fds are passed to ustctl, set them
- * to -1 here.
+ * The channel fds are passed to ustctl, we only keep a copy.
*/
- chan->shm_fd = -1;
- chan->wait_fd = -1;
+ chan->shm_fd_is_copy = 1;
+ chan->wait_fd_is_copy = 1;
+
return 0;
}
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
+{
+ ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
+ stream->hangup_flush_done = 1;
+}
+
void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
{
ustctl_unmap_channel(chan->handle);
return -EINVAL;
}
/*
- * The stream shm and wait fds are passed to ustctl, set them to
- * -1 here.
+ * The stream fds are passed to ustctl, we only keep a copy.
*/
- stream->shm_fd = -1;
- stream->wait_fd = -1;
+ stream->shm_fd_is_copy = 1;
+ stream->wait_fd_is_copy = 1;
return 0;
}
{
ustctl_close_stream_read(stream->chan->handle, stream->buf);
}
+
+
+int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ unsigned long len;
+ int err;
+ long ret = 0;
+ struct lttng_ust_shm_handle *handle;
+ struct lttng_ust_lib_ring_buffer *buf;
+ char dummy;
+ ssize_t readlen;
+
+ DBG("In read_subbuffer (wait_fd: %d, stream key: %d)",
+ stream->wait_fd, stream->key);
+
+ /* We can consume the 1 byte written into the wait_fd by UST */
+ if (!stream->hangup_flush_done) {
+ do {
+ readlen = read(stream->wait_fd, &dummy, 1);
+ } while (readlen == -1 && errno == -EINTR);
+ if (readlen == -1) {
+ ret = readlen;
+ goto end;
+ }
+ }
+
+ buf = stream->buf;
+ handle = stream->chan->handle;
+ /* Get the next subbuffer */
+ err = ustctl_get_next_subbuf(handle, buf);
+ if (err != 0) {
+ ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
+ /*
+ * This is a debug message even for single-threaded consumer,
+ * because poll() have more relaxed criterions than get subbuf,
+ * so get_subbuf may fail for short race windows where poll()
+ * would issue wakeups.
+ */
+ DBG("Reserving sub buffer failed (everything is normal, "
+ "it is due to concurrency)");
+ goto end;
+ }
+ assert(stream->output == LTTNG_EVENT_MMAP);
+ /* read the used subbuffer size */
+ err = ustctl_get_padded_subbuf_size(handle, buf, &len);
+ assert(err == 0);
+ /* write the subbuffer to the tracefile */
+ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
+ if (ret < 0) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error writing to tracefile");
+ }
+ err = ustctl_put_next_subbuf(handle, buf);
+ assert(err == 0);
+end:
+ return ret;
+}
+
+int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ /* Opening the tracefile in write mode */
+ if (stream->path_name != NULL) {
+ ret = open(stream->path_name,
+ O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+ if (ret < 0) {
+ ERR("Opening %s", stream->path_name);
+ perror("open");
+ goto error;
+ }
+ stream->out_fd = ret;
+ ret = chown(stream->path_name, stream->uid, stream->gid);
+ if (ret < 0) {
+ ERR("Changing ownership of %s", stream->path_name);
+ perror("chown");
+ }
+ }
+
+ /* we return 0 to let the library handle the FD internally */
+ return 0;
+
+error:
+ return ret;
+}