static struct lttng_ht *metadata_ht;
static struct lttng_ht *data_ht;
-/*
- * This hash table contains the mapping between the session id of the sessiond
- * and the relayd session id. Element of the ht are indexed by sessiond session
- * id.
- *
- * Node can be added when a relayd communication is opened in the sessiond
- * thread.
- *
- * Note that a session id of the session daemon is unique to a tracing session
- * and not to a domain session. However, a domain session has one consumer
- * which forces the 1-1 mapping between a consumer and a domain session (ex:
- * UST). This means that we can't have duplicate in this ht.
- */
-static struct lttng_ht *relayd_session_id_ht;
-
/*
* Notify a thread pipe to poll back again. This usually means that some global
* state has changed so we just send back the thread in a poll wait call.
{
int ret;
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
if (relayd == NULL) {
return;
DBG("Consumer destroy and close relayd socket pair");
- /* Loockup for a relayd node in the session id map hash table. */
- lttng_ht_lookup(relayd_session_id_ht,
- (void *)((unsigned long) relayd->sessiond_session_id), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
- if (node == NULL) {
- /* We assume the relayd is being or is destroyed */
- return;
- }
-
- /*
- * Try to delete it from the relayd session id ht. The return value is of
- * no importance since either way we are going to try to delete the relayd
- * from the global relayd_ht.
- */
- lttng_ht_del(relayd_session_id_ht, &iter);
-
iter.iter.node = &relayd->node.node;
ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
if (ret != 0) {
}
lttng_ht_destroy(consumer_data.relayd_ht);
- /* The destroy_relayd call makes sure that this ht is empty here. */
- lttng_ht_destroy(relayd_session_id_ht);
rcu_read_unlock();
}
* this next value, 1 should always be substracted in order to compare
* the last seen sequence number on the relayd side to the last sent.
*/
- data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
+ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
/* Other fields are zeroed previously */
ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
goto error;
}
+ ++stream->next_net_seq_num;
+
/* Set to go on data socket */
outfd = relayd->data_sock.fd;
}
if (stream == NULL) {
/* Check for deleted streams. */
validate_endpoint_status_metadata_stream(&events);
- continue;
+ goto restart;
}
DBG("Adding metadata stream %d to poll set",
* array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
- size_t pipe_readlen;
+ ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
/* Consume 1 byte of pipe data */
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
}
/*
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
unsigned int sessiond_id)
{
- int fd = -1, ret = -1, relayd_created = 0;
+ int fd = -1, ret = -1, relayd_created = 0, sock_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
- struct consumer_relayd_session_id *relayd_id_node;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
/* Assign new file descriptor */
relayd->control_sock.fd = fd;
+ /* Flag that we have successfully created a socket with a valid fd. */
+ sock_created = 1;
/*
* Create a session on the relayd and store the returned id. Lock the
goto error;
}
- /* Set up a relayd session id node. */
- relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id));
- if (!relayd_id_node) {
- PERROR("zmalloc relayd id node");
- ret = -1;
- goto error;
- }
-
- relayd_id_node->relayd_id = relayd->relayd_session_id;
- relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
-
- /* Indexed by session id of the sessiond. */
- lttng_ht_node_init_ulong(&relayd_id_node->node,
- relayd_id_node->sessiond_id);
- rcu_read_lock();
- lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node);
- rcu_read_unlock();
-
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
/* Assign new file descriptor */
relayd->data_sock.fd = fd;
+ /* Flag that we have successfully created a socket with a valid fd. */
+ sock_created = 1;
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
}
if (relayd_created) {
- /* We just want to cleanup. Ignore ret value. */
- (void) relayd_close(&relayd->control_sock);
- (void) relayd_close(&relayd->data_sock);
+ if (sock_created) {
+ /* We just want to close the fd for cleanup. Ignore ret value. */
+ (void) relayd_close(&relayd->control_sock);
+ (void) relayd_close(&relayd->data_sock);
+ }
free(relayd);
}