#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
+#include <inttypes.h>
#include <common/common.h>
#include <common/kernel-ctl/kernel-ctl.h>
call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
}
+/*
+ * Flag a relayd socket pair for destruction. Destroy it if the refcount
+ * reaches zero.
+ *
+ * RCU read side lock MUST be aquired before calling this function.
+ */
+void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
+{
+ assert(relayd);
+
+ /* Set destroy flag for this object */
+ uatomic_set(&relayd->destroy_flag, 1);
+
+ /* Destroy the relayd if refcount is 0 */
+ if (uatomic_read(&relayd->refcount) == 0) {
+ consumer_destroy_relayd(relayd);
+ }
+}
+
/*
* Remove a stream from the global list protected by a mutex. This
* function is also responsible for freeing its data structures.
PERROR("write metadata stream id");
goto end;
}
- DBG("Metadata stream id %zu written before data",
+ DBG("Metadata stream id %" PRIu64 " written before data",
stream->relayd_stream_id);
end:
}
goto end;
} else if (ret > len) {
- PERROR("Error in file write (ret %ld > len %lu)", ret, len);
+ PERROR("Error in file write (ret %zd > len %lu)", ret, len);
written += ret;
goto end;
} else {
len -= ret;
mmap_offset += ret;
}
- DBG("Consumer mmap write() ret %ld (len %lu)", ret, len);
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* send the appropriate error description to sessiond */
switch (ret) {
case EBADF:
- lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF);
break;
case EINVAL:
- lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
break;
case ENOMEM:
- lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
break;
case ESPIPE:
- lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
break;
}
metadata_ht);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
- lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
goto restart;
}
perror("Poll error");
- lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
goto end;
} else if (num_rdy == 0) {
DBG("Polling thread timed out");
}
DBG("Sending ready command to lttng-sessiond");
- ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
+ ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
/* return < 0 on error, but == 0 is not fatal */
if (ret < 0) {
ERR("Error sending ready command to lttng-sessiond");
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
}
+
+/*
+ * Process the ADD_RELAYD command receive by a consumer.
+ *
+ * This will create a relayd socket pair and add it to the relayd hash table.
+ * The caller MUST acquire a RCU read side lock before calling it.
+ */
+int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+ struct lttng_consumer_local_data *ctx, int sock,
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
+{
+ int fd, ret = -1;
+ struct consumer_relayd_sock_pair *relayd;
+
+ DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+
+ /* Get relayd reference if exists. */
+ relayd = consumer_find_relayd(net_seq_idx);
+ if (relayd == NULL) {
+ /* Not found. Allocate one. */
+ relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
+ if (relayd == NULL) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ goto error;
+ }
+ }
+
+ /* Poll on consumer socket. */
+ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ ret = -EINTR;
+ goto error;
+ }
+
+ /* Get relayd socket from session daemon */
+ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
+ if (ret != sizeof(fd)) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+ ret = -1;
+ goto error;
+ }
+
+ /* Copy socket information and received FD */
+ switch (sock_type) {
+ case LTTNG_STREAM_CONTROL:
+ /* Copy received lttcomm socket */
+ lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
+ ret = lttcomm_create_sock(&relayd->control_sock);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Close the created socket fd which is useless */
+ close(relayd->control_sock.fd);
+
+ /* Assign new file descriptor */
+ relayd->control_sock.fd = fd;
+ break;
+ case LTTNG_STREAM_DATA:
+ /* Copy received lttcomm socket */
+ lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
+ ret = lttcomm_create_sock(&relayd->data_sock);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Close the created socket fd which is useless */
+ close(relayd->data_sock.fd);
+
+ /* Assign new file descriptor */
+ relayd->data_sock.fd = fd;
+ break;
+ default:
+ ERR("Unknown relayd socket type (%d)", sock_type);
+ goto error;
+ }
+
+ DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
+ sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
+ relayd->net_seq_idx, fd);
+
+ /*
+ * Add relayd socket pair to consumer data hashtable. If object already
+ * exists or on error, the function gracefully returns.
+ */
+ consumer_add_relayd(relayd);
+
+ /* All good! */
+ ret = 0;
+
+error:
+ return ret;
+}