#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
+#include <inttypes.h>
#include <common/common.h>
#include <common/kernel-ctl/kernel-ctl.h>
uatomic_dec(&relayd->refcount);
assert(uatomic_read(&relayd->refcount) >= 0);
+ /* Closing streams requires to lock the control socket. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_send_close_stream(&relayd->control_sock,
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- ERR("Unable to close stream on the relayd. Continuing");
- /* Continue here. There is nothing we can do for the relayd.*/
+ DBG("Unable to close stream on the relayd. Continuing");
+ /*
+ * Continue here. There is nothing we can do for the relayd.
+ * Chances are that the relayd has closed the socket so we just
+ * continue cleaning up.
+ */
}
/* Both conditions are met, we destroy the relayd. */
}
/*
- * Add relayd socket to global consumer data hashtable.
+ * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
+ * be acquired before calling this.
*/
+
int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret = 0;
goto end;
}
- rcu_read_lock();
-
lttng_ht_lookup(consumer_data.relayd_ht,
(void *)((unsigned long) relayd->net_seq_idx), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
- rcu_read_unlock();
/* Relayd already exist. Ignore the insertion */
goto end;
}
lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
- rcu_read_unlock();
-
end:
return ret;
}
*
* Return destination file descriptor or negative value on error.
*/
-int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
- size_t data_size)
+static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
+ size_t data_size, struct consumer_relayd_sock_pair *relayd)
{
int outfd = -1, ret;
- struct consumer_relayd_sock_pair *relayd;
struct lttcomm_relayd_data_hdr data_hdr;
/* Safety net */
assert(stream);
+ assert(relayd);
/* Reset data header */
memset(&data_hdr, 0, sizeof(data_hdr));
- rcu_read_lock();
- /* Get relayd reference of the stream. */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd == NULL) {
- /* Stream is either local or corrupted */
- goto error;
- }
-
- DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
if (stream->metadata_flag) {
/* Caller MUST acquire the relayd control socket lock */
ret = relayd_send_metadata(&relayd->control_sock, data_size);
}
error:
- rcu_read_unlock();
return outfd;
}
}
/*
- * Mmap the ring buffer, read it and write the data to the tracefile.
+ * Write the metadata stream id on the specified file descriptor.
+ */
+static int write_relayd_metadata_id(int fd,
+ struct lttng_consumer_stream *stream,
+ struct consumer_relayd_sock_pair *relayd)
+{
+ int ret;
+ uint64_t metadata_id;
+
+ metadata_id = htobe64(stream->relayd_stream_id);
+ do {
+ ret = write(fd, (void *) &metadata_id,
+ sizeof(stream->relayd_stream_id));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("write metadata stream id");
+ goto end;
+ }
+ DBG("Metadata stream id %" PRIu64 " written before data",
+ stream->relayd_stream_id);
+
+end:
+ return ret;
+}
+
+/*
+ * Mmap the ring buffer, read it and write the data to the tracefile. This is a
+ * core function for writing trace buffers to either the local filesystem or
+ * the network.
+ *
+ * Careful review MUST be put if any changes occur!
*
* Returns the number of bytes written
*/
off_t orig_offset = stream->out_fd_offset;
/* Default is on the disk */
int outfd = stream->out_fd;
- uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
/* RCU lock for the relayd pointer */
netlen += sizeof(stream->relayd_stream_id);
}
- ret = consumer_handle_stream_before_relayd(stream, netlen);
+ ret = write_relayd_stream_header(stream, netlen, relayd);
if (ret >= 0) {
/* Use the returned socket. */
outfd = ret;
/* Write metadata stream id before payload */
if (stream->metadata_flag) {
- metadata_id = htobe64(stream->relayd_stream_id);
- do {
- ret = write(outfd, (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
- } while (ret < 0 && errno == EINTR);
+ ret = write_relayd_metadata_id(outfd, stream, relayd);
if (ret < 0) {
- PERROR("write metadata stream id");
written = ret;
goto end;
}
- DBG("Metadata stream id %zu written before data",
- stream->relayd_stream_id);
- /*
- * We do this so the return value can match the len passed as
- * argument to this function.
- */
- written -= sizeof(stream->relayd_stream_id);
}
}
/* Else, use the default set before which is the filesystem. */
}
goto end;
} else if (ret > len) {
- PERROR("Error in file write (ret %ld > len %lu)", ret, len);
+ PERROR("Error in file write (ret %zd > len %lu)", ret, len);
written += ret;
goto end;
} else {
len -= ret;
mmap_offset += ret;
}
- DBG("Consumer mmap write() ret %ld (len %lu)", ret, len);
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
int fd = stream->wait_fd;
/* Default is on the disk */
int outfd = stream->out_fd;
- uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
switch (consumer_data.type) {
*/
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- metadata_id = htobe64(stream->relayd_stream_id);
- do {
- ret = write(ctx->consumer_thread_pipe[1], (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
- } while (ret < 0 && errno == EINTR);
+ ret = write_relayd_metadata_id(ctx->consumer_thread_pipe[1],
+ stream, relayd);
if (ret < 0) {
- PERROR("write metadata stream id");
written = ret;
goto end;
}
- DBG("Metadata stream id %zu written before data",
- stream->relayd_stream_id);
}
while (len > 0) {
written -= sizeof(stream->relayd_stream_id);
}
- ret = consumer_handle_stream_before_relayd(stream, ret_splice);
+ ret = write_relayd_stream_header(stream, ret_splice, relayd);
if (ret >= 0) {
/* Use the returned socket. */
outfd = ret;
} else {
- if (outfd == -1) {
- ERR("Remote relayd disconnected. Stopping");
- goto end;
- }
+ ERR("Remote relayd disconnected. Stopping");
+ goto end;
}
}