#include "consumer.h"
+/*
+ * Receive a reply command status message from the consumer. Consumer socket
+ * lock MUST be acquired before calling this function.
+ *
+ * Return 0 on success, -1 on recv error or a negative lttng error code which
+ * was possibly returned by the consumer.
+ */
+int consumer_recv_status_reply(struct consumer_socket *sock)
+{
+ int ret;
+ struct lttcomm_consumer_status_msg reply;
+
+ assert(sock);
+
+ ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply));
+ if (ret < 0) {
+ PERROR("recv consumer status msg");
+ goto end;
+ }
+
+ if (reply.ret_code == LTTNG_OK) {
+ /* All good. */
+ ret = 0;
+ } else {
+ ret = -reply.ret_code;
+ ERR("Consumer ret code %d", reply.ret_code);
+ }
+
+end:
+ return ret;
+}
+
/*
* Send destroy relayd command to consumer.
*
pthread_mutex_lock(sock->lock);
ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
- pthread_mutex_unlock(sock->lock);
if (ret < 0) {
- PERROR("send consumer destroy relayd command");
- goto error;
+ /* Indicate that the consumer is probably closing at this point. */
+ DBG("send consumer destroy relayd command");
+ goto error_send;
}
+ /* Don't check the return value. The caller will do it. */
+ ret = consumer_recv_status_reply(sock);
+
DBG2("Consumer send destroy relayd command done");
+error_send:
+ pthread_mutex_unlock(sock->lock);
error:
return ret;
}
*/
void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
{
- int ret;
struct lttng_ht_iter iter;
struct consumer_socket *socket;
rcu_read_lock();
cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
node.node) {
+ int ret;
+
/* Send destroy relayd command */
ret = consumer_send_destroy_relayd(socket, consumer);
if (ret < 0) {
- ERR("Unable to send destroy relayd command to consumer");
+ DBG("Unable to send destroy relayd command to consumer");
/* Continue since we MUST delete everything at this point. */
}
}
*/
struct consumer_output *consumer_copy_output(struct consumer_output *obj)
{
+ struct lttng_ht *tmp_ht_ptr;
struct lttng_ht_iter iter;
struct consumer_socket *socket, *copy_sock;
struct consumer_output *output;
if (output == NULL) {
goto error;
}
+ /* Avoid losing the HT reference after the memcpy() */
+ tmp_ht_ptr = output->socks;
memcpy(output, obj, sizeof(struct consumer_output));
- /* Copy sockets */
- output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ /* Putting back the HT pointer and start copying socket(s). */
+ output->socks = tmp_ht_ptr;
cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
/* Create new socket object. */
goto malloc_error;
}
+ copy_sock->registered = socket->registered;
copy_sock->lock = socket->lock;
consumer_add_socket(copy_sock, output);
}
/*
* Send file descriptor to consumer via sock.
*/
-int consumer_send_fds(int sock, int *fds, size_t nb_fd)
+int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
{
int ret;
assert(fds);
+ assert(sock);
assert(nb_fd > 0);
- ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
+ ret = lttcomm_send_fds_unix_sock(sock->fd, fds, nb_fd);
if (ret < 0) {
PERROR("send consumer fds");
goto error;
}
+ ret = consumer_recv_status_reply(sock);
+
error:
return ret;
}
/*
* Consumer send channel communication message structure to consumer.
*/
-int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
+int consumer_send_channel(struct consumer_socket *sock,
+ struct lttcomm_consumer_msg *msg)
{
int ret;
assert(msg);
- assert(sock >= 0);
+ assert(sock);
+ assert(sock->fd >= 0);
- ret = lttcomm_send_unix_sock(sock, msg,
+ ret = lttcomm_send_unix_sock(sock->fd, msg,
sizeof(struct lttcomm_consumer_msg));
if (ret < 0) {
PERROR("send consumer channel");
goto error;
}
+ ret = consumer_recv_status_reply(sock);
+
error:
return ret;
}
/*
* Send stream communication structure to the consumer.
*/
-int consumer_send_stream(int sock, struct consumer_output *dst,
- struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
+int consumer_send_stream(struct consumer_socket *sock,
+ struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
+ int *fds, size_t nb_fd)
{
int ret;
assert(msg);
assert(dst);
+ assert(sock);
switch (dst->type) {
case CONSUMER_DST_NET:
}
/* Send on socket */
- ret = lttcomm_send_unix_sock(sock, msg,
+ ret = lttcomm_send_unix_sock(sock->fd, msg,
sizeof(struct lttcomm_consumer_msg));
if (ret < 0) {
PERROR("send consumer stream");
goto error;
}
+ ret = consumer_recv_status_reply(sock);
+ if (ret < 0) {
+ goto error;
+ }
+
ret = consumer_send_fds(sock, fds, nb_fd);
if (ret < 0) {
goto error;
*
* On success return positive value. On error, negative value.
*/
-int consumer_send_relayd_socket(int consumer_sock,
+int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
struct lttcomm_sock *sock, struct consumer_output *consumer,
- enum lttng_stream_type type)
+ enum lttng_stream_type type, unsigned int session_id)
{
int ret;
struct lttcomm_consumer_msg msg;
/* Code flow error. Safety net. */
assert(sock);
assert(consumer);
+ assert(consumer_sock);
/* Bail out if consumer is disabled */
if (!consumer->enabled) {
*/
msg.u.relayd_sock.net_index = consumer->net_seq_index;
msg.u.relayd_sock.type = type;
+ msg.u.relayd_sock.session_id = session_id;
memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
- DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
- ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
+ DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
+ ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
if (ret < 0) {
PERROR("send consumer relayd socket info");
goto error;
}
+ ret = consumer_recv_status_reply(consumer_sock);
+ if (ret < 0) {
+ goto error;
+ }
+
DBG3("Sending relayd socket file descriptor to consumer");
ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
if (ret < 0) {
}
/*
- * Ask the consumer if the data is ready to bread (available) for the specific
+ * Ask the consumer if the data is ready to read (NOT pending) for the specific
* session id.
*
* This function has a different behavior with the consumer i.e. that it waits
- * for a reply from the consumer if yes or no the data is available.
+ * for a reply from the consumer if yes or no the data is pending.
*/
-int consumer_is_data_available(unsigned int id,
+int consumer_is_data_pending(unsigned int id,
struct consumer_output *consumer)
{
int ret;
- int32_t ret_code = 1; /* Default is that the data is available */
+ int32_t ret_code = 0; /* Default is that the data is NOT pending */
struct consumer_socket *socket;
struct lttng_ht_iter iter;
struct lttcomm_consumer_msg msg;
assert(consumer);
- msg.cmd_type = LTTNG_CONSUMER_DATA_AVAILABLE;
+ msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
- msg.u.data_available.session_id = (uint64_t) id;
+ msg.u.data_pending.session_id = (uint64_t) id;
- DBG3("Consumer data available for id %u", id);
+ DBG3("Consumer data pending for id %u", id);
/* Send command for each consumer */
cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
if (ret < 0) {
- PERROR("send consumer data available command");
+ PERROR("send consumer data pending command");
pthread_mutex_unlock(socket->lock);
goto error;
}
/*
- * Waiting for the reply code where 0 the data is not available and 1
- * it is for trace reading.
+ * No need for a recv reply status because the answer to the command is
+ * the reply status message.
*/
+
ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
if (ret < 0) {
- PERROR("recv consumer data available status");
+ PERROR("recv consumer data pending status");
pthread_mutex_unlock(socket->lock);
goto error;
}
pthread_mutex_unlock(socket->lock);
- if (ret_code == 0) {
+ if (ret_code == 1) {
break;
}
}
- DBG("Consumer data available ret %d", ret_code);
+ DBG("Consumer data pending ret %d", ret_code);
return ret_code;
error: