#include "consumer.h"
+/*
+ * Find a consumer_socket in a consumer_output hashtable. Read side lock must
+ * be acquired before calling this function and across use of the
+ * returned consumer_socket.
+ */
+struct consumer_socket *consumer_find_socket(int key,
+ struct consumer_output *consumer)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct consumer_socket *socket = NULL;
+
+ /* Negative keys are lookup failures */
+ if (key < 0) {
+ return NULL;
+ }
+
+ lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ socket = caa_container_of(node, struct consumer_socket, node);
+ }
+
+ return socket;
+}
+
+/*
+ * Allocate a new consumer_socket and return the pointer.
+ */
+struct consumer_socket *consumer_allocate_socket(int fd)
+{
+ struct consumer_socket *socket = NULL;
+
+ socket = zmalloc(sizeof(struct consumer_socket));
+ if (socket == NULL) {
+ PERROR("zmalloc consumer socket");
+ goto error;
+ }
+
+ socket->fd = fd;
+ lttng_ht_node_init_ulong(&socket->node, fd);
+
+error:
+ return socket;
+}
+
+/*
+ * Add consumer socket to consumer output object. Read side lock must be
+ * acquired before calling this function.
+ */
+void consumer_add_socket(struct consumer_socket *sock,
+ struct consumer_output *consumer)
+{
+ assert(sock);
+ assert(consumer);
+
+ lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
+}
+
+/*
+ * Delte consumer socket to consumer output object. Read side lock must be
+ * acquired before calling this function.
+ */
+void consumer_del_socket(struct consumer_socket *sock,
+ struct consumer_output *consumer)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+
+ assert(sock);
+ assert(consumer);
+
+ iter.iter.node = &sock->node.node;
+ ret = lttng_ht_del(consumer->socks, &iter);
+ assert(!ret);
+}
+
+/*
+ * RCU destroy call function.
+ */
+static void destroy_socket_rcu(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct consumer_socket *socket =
+ caa_container_of(node, struct consumer_socket, node);
+
+ free(socket);
+}
+
+/*
+ * Destroy and free socket pointer in a call RCU. Read side lock must be
+ * acquired before calling this function.
+ */
+void consumer_destroy_socket(struct consumer_socket *sock)
+{
+ assert(sock);
+
+ /*
+ * We DO NOT close the file descriptor here since it is global to the
+ * session daemon and is closed only if the consumer dies.
+ */
+
+ call_rcu(&sock->node.head, destroy_socket_rcu);
+}
+
/*
* Allocate and assign data to a consumer_output object.
*
output->enabled = 1;
output->type = type;
output->net_seq_index = -1;
- /*
- * Important to keep it to a negative value on creation since it was zeroed
- * during allocation and the file descriptor 0 is a valid one.
- */
- output->sock = -1;
+
+ output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
error:
return output;
return;
}
- if (obj->sock >= 0) {
- (void) close(obj->sock);
+ if (obj->socks) {
+ struct lttng_ht_iter iter;
+ struct consumer_socket *socket;
+
+ cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
+ consumer_destroy_socket(socket);
+ }
}
+
free(obj);
}
*/
struct consumer_output *consumer_copy_output(struct consumer_output *obj)
{
+ struct lttng_ht_iter iter;
+ struct consumer_socket *socket, *copy_sock;
struct consumer_output *output;
assert(obj);
memcpy(output, obj, sizeof(struct consumer_output));
+ /* Copy sockets */
+ output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+
+ cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
+ /* Create new socket object. */
+ copy_sock = consumer_allocate_socket(socket->fd);
+ if (copy_sock == NULL) {
+ goto malloc_error;
+ }
+
+ copy_sock->lock = socket->lock;
+ consumer_add_socket(copy_sock, output);
+ }
+
error:
return output;
+
+malloc_error:
+ consumer_destroy_output(output);
+ return NULL;
}
/*
msg.u.relayd_sock.type = type;
memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
- DBG3("Sending relayd sock info to consumer");
+ DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
if (ret < 0) {
PERROR("send consumer relayd socket info");
error:
return ret;
}
+
+/*
+ * Send destroy relayd command to consumer.
+ *
+ * On success return positive value. On error, negative value.
+ */
+int consumer_send_destroy_relayd(struct consumer_socket *sock,
+ struct consumer_output *consumer)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ assert(consumer);
+ assert(sock);
+
+ DBG2("Sending destroy relayd command to consumer...");
+
+ /* Bail out if consumer is disabled */
+ if (!consumer->enabled) {
+ ret = LTTCOMM_OK;
+ DBG3("Consumer is disabled");
+ goto error;
+ }
+
+ msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
+ msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
+
+ pthread_mutex_lock(sock->lock);
+ ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
+ pthread_mutex_unlock(sock->lock);
+ if (ret < 0) {
+ PERROR("send consumer destroy relayd command");
+ goto error;
+ }
+
+ DBG2("Consumer send destroy relayd command done");
+
+error:
+ return ret;
+}