X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=b5e212f27d61c5f08f53ef32fb19ac91c1e55917;hb=46e6455f9dbe3bbe9b39f9e7b55dde228f6e3dbd;hp=f4cfa82ce8dcc1179843a806a4d969f432b5391e;hpb=c5b6f4f08fe8d1abff74c7f6ad3630b7dcf0669d;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index f4cfa82ce..b5e212f27 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -63,6 +63,20 @@ volatile int consumer_quit; static struct lttng_ht *metadata_ht; static struct lttng_ht *data_ht; +/* + * This hash table contains the mapping between the session id of the sessiond + * and the relayd session id. Element of the ht are indexed by sessiond_id. + * + * Node can be added when a relayd communication is opened in the sessiond + * thread. + * + * Note that a session id of the session daemon is unique to a tracing session + * and not to a domain session. However, a domain session has one consumer + * which forces the 1-1 mapping between a consumer and a domain session (ex: + * UST). This means that we can't have duplicate in this ht. + */ +static struct lttng_ht *relayd_session_id_ht; + /* * Notify a thread pipe to poll back again. This usually means that some global * state has changed so we just send back the thread in a poll wait call. @@ -216,6 +230,7 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) { int ret; struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; if (relayd == NULL) { return; @@ -223,6 +238,20 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) DBG("Consumer destroy and close relayd socket pair"); + lttng_ht_lookup(relayd_session_id_ht, + (void *)((unsigned long) relayd->session_id), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + /* We assume the relayd is being or is destroyed */ + return; + } + + ret = lttng_ht_del(relayd_session_id_ht, &iter); + if (ret != 0) { + /* We assume the relayd is being or is destroyed */ + return; + } + iter.iter.node = &relayd->node.node; ret = lttng_ht_del(consumer_data.relayd_ht, &iter); if (ret != 0) { @@ -2666,6 +2695,7 @@ void lttng_consumer_init(void) consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); } /* @@ -2676,11 +2706,13 @@ void lttng_consumer_init(void) */ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, - struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock) + struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock, + unsigned int sessiond_id) { int fd = -1, ret = -1; enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; + struct consumer_relayd_session_id *relayd_id_node; DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); @@ -2754,6 +2786,23 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, goto error; } + /* Set up a relayd session id node. */ + relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id)); + if (!relayd_id_node) { + PERROR("zmalloc relayd id node"); + goto error; + } + + relayd_id_node->relayd_id = relayd->session_id; + relayd_id_node->sessiond_id = (uint64_t) sessiond_id; + + /* Indexed by session id of the session daemon. */ + lttng_ht_node_init_ulong(&relayd_id_node->node, + relayd_id_node->sessiond_id); + rcu_read_lock(); + lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node); + rcu_read_unlock(); + break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */