#define _GNU_SOURCE
#include <assert.h>
+#include <lttng/ust-ctl.h>
#include <poll.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
-#include <lttng/ust-ctl.h>
#include <common/common.h>
#include <common/sessiond-comm/sessiond-comm.h>
extern volatile int consumer_quit;
/*
- * Mmap the ring buffer, read it and write the data to the tracefile.
- *
- * Returns the number of bytes written, else negative value on error.
- */
-ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len)
-{
- unsigned long mmap_offset;
- long ret = 0, written = 0;
- off_t orig_offset = stream->out_fd_offset;
- int outfd = stream->out_fd;
- uint64_t metadata_id;
- struct consumer_relayd_sock_pair *relayd = NULL;
-
- /* Flag that the current stream if set for network streaming. */
- if (stream->net_seq_idx != -1) {
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd == NULL) {
- goto end;
- }
- }
-
- /* get the offset inside the fd to mmap */
- ret = ustctl_get_mmap_read_offset(stream->chan->handle,
- stream->buf, &mmap_offset);
- if (ret != 0) {
- errno = -ret;
- PERROR("ustctl_get_mmap_read_offset");
- written = ret;
- goto end;
- }
-
- /* Handle stream on the relayd if the output is on the network */
- if (relayd) {
- if (stream->metadata_flag) {
- /* Only lock if metadata since we use the control socket. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- }
-
- ret = consumer_handle_stream_before_relayd(stream, len);
- if (ret >= 0) {
- outfd = ret;
-
- /* Write metadata stream id before payload */
- if (stream->metadata_flag) {
- metadata_id = htobe64(stream->relayd_stream_id);
- do {
- ret = write(outfd, (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
- if (ret < 0) {
- PERROR("write metadata stream id");
- written = ret;
- goto end;
- }
- } while (errno == EINTR);
- DBG("Metadata stream id %zu written before data",
- stream->relayd_stream_id);
- }
- }
- /* Else, use the default set before which is the filesystem. */
- }
-
- while (len > 0) {
- ret = write(outfd, stream->mmap_base + mmap_offset, len);
- if (ret < 0) {
- if (errno == EINTR) {
- /* restart the interrupted system call */
- continue;
- } else {
- PERROR("Error in file write");
- if (written == 0) {
- written = ret;
- }
- goto end;
- }
- } else if (ret > len) {
- PERROR("ret %ld > len %lu", ret, len);
- written += ret;
- goto end;
- } else {
- len -= ret;
- mmap_offset += ret;
- }
- DBG("UST mmap write() ret %ld (len %lu)", ret, len);
-
- /* This call is useless on a socket so better save a syscall. */
- if (!relayd) {
- /* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
- SYNC_FILE_RANGE_WRITE);
- stream->out_fd_offset += ret;
- }
- written += ret;
- }
- lttng_consumer_sync_trace_file(stream, orig_offset);
-
-end:
- if (relayd && stream->metadata_flag) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- }
- return written;
-}
-
-/*
- * Splice the data from the ring buffer to the tracefile.
- *
- * Returns the number of bytes spliced.
+ * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
+ * compiled out, we isolate it in this library.
*/
-ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len)
+int lttng_ustctl_get_mmap_read_offset(struct lttng_ust_shm_handle *handle,
+ struct lttng_ust_lib_ring_buffer *buf, unsigned long *off)
{
- return -ENOSYS;
-}
+ return ustctl_get_mmap_read_offset(handle, buf, off);
+};
/*
* Take a snapshot for a specific fd
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
+ DBG("Consumer received unexpected message size %zd (expects %zu)",
+ ret, sizeof(msg));
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
return ret;
}
return -ENOENT;
}
+ /* relayd need RCU read-side lock */
+ rcu_read_lock();
+
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
int fds[1];
size_t nb_fd = 1;
+ DBG("UST Consumer adding channel");
+
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
return -EINTR;
size_t nb_fd = 2;
struct consumer_relayd_sock_pair *relayd = NULL;
+ DBG("UST Consumer adding stream");
+
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
return -EINTR;
return ret;
}
+ DBG("consumer_add_stream chan %d stream %d",
+ msg.u.stream.channel_key,
+ msg.u.stream.stream_key);
+
assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
- new_stream = consumer_allocate_stream(msg.u.channel.channel_key,
+ new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
msg.u.stream.stream_key,
fds[0], fds[1],
msg.u.stream.state,
new_stream->relayd_stream_id);
break;
}
+ case LTTNG_CONSUMER_DESTROY_RELAYD:
+ {
+ struct consumer_relayd_sock_pair *relayd;
+
+ DBG("UST consumer destroying relayd %zu",
+ msg.u.destroy_relayd.net_seq_idx);
+
+ /* Get relayd reference if exists. */
+ relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx);
+ if (relayd == NULL) {
+ ERR("Unable to find relayd %zu",
+ msg.u.destroy_relayd.net_seq_idx);
+ }
+
+ /* Set destroy flag for this object */
+ uatomic_set(&relayd->destroy_flag, 1);
+
+ /* Destroy the relayd if refcount is 0 else set the destroy flag. */
+ if (uatomic_read(&relayd->refcount) == 0) {
+ consumer_destroy_relayd(relayd);
+ }
+ break;
+ }
case LTTNG_CONSUMER_UPDATE_STREAM:
{
return -ENOSYS;
*/
do {
ret = write(ctx->consumer_poll_pipe[1], "", 1);
- } while (ret == -1UL && errno == EINTR);
+ } while (ret < 0 && errno == EINTR);
end_nosignal:
+ rcu_read_unlock();
return 0;
}
* display the error but continue processing to try
* to release the subbuffer
*/
- ERR("Error writing to tracefile");
+ ERR("Error writing to tracefile (expected: %ld, got: %ld)", ret, len);
}
err = ustctl_put_next_subbuf(handle, buf);
assert(err == 0);