X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.cpp;h=13c18325d75d363e5da76674734b4e3aedcd0c07;hb=56047f5a23df5c2c583a102b8015bbec5a7da9f1;hp=17d2248bec8f9f2c35a1bb6e90f92b694867335c;hpb=28ab034a2c3582d07d3423d2d746731f87d3969f;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/consumer.cpp b/src/bin/lttng-sessiond/consumer.cpp index 17d2248be..13c18325d 100644 --- a/src/bin/lttng-sessiond/consumer.cpp +++ b/src/bin/lttng-sessiond/consumer.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -83,7 +84,7 @@ char *setup_channel_trace_path(struct consumer_output *consumer, return pathname; error: free(pathname); - return NULL; + return nullptr; } /* @@ -291,18 +292,17 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer) /* Destroy any relayd connection */ if (consumer->type == CONSUMER_DST_NET) { - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - int ret; + /* Send destroy relayd command. */ + const int ret = consumer_send_destroy_relayd(socket, consumer); - /* Send destroy relayd command */ - ret = consumer_send_destroy_relayd(socket, consumer); if (ret < 0) { DBG("Unable to send destroy relayd command to consumer"); /* Continue since we MUST delete everything at this point. */ } } - rcu_read_unlock(); } } @@ -319,7 +319,9 @@ int consumer_create_socket(struct consumer_data *data, struct consumer_output *o LTTNG_ASSERT(data); - if (output == NULL || data->cmd_sock < 0) { + lttng::urcu::read_lock_guard read_lock; + + if (output == nullptr || data->cmd_sock < 0) { /* * Not an error. Possible there is simply not spawned consumer or it's * disabled for the tracing session asking the socket. @@ -327,21 +329,17 @@ int consumer_create_socket(struct consumer_data *data, struct consumer_output *o goto error; } - rcu_read_lock(); socket = consumer_find_socket(data->cmd_sock, output); - rcu_read_unlock(); - if (socket == NULL) { + if (socket == nullptr) { socket = consumer_allocate_socket(&data->cmd_sock); - if (socket == NULL) { + if (socket == nullptr) { ret = -1; goto error; } socket->registered = 0; socket->lock = &data->lock; - rcu_read_lock(); consumer_add_socket(socket, output); - rcu_read_unlock(); } socket->type = data->type; @@ -363,7 +361,7 @@ struct consumer_socket *consumer_find_socket_by_bitness(int bits, const struct consumer_output *consumer) { int consumer_fd; - struct consumer_socket *socket = NULL; + struct consumer_socket *socket = nullptr; ASSERT_RCU_READ_LOCKED(); @@ -397,18 +395,18 @@ struct consumer_socket *consumer_find_socket(int key, const struct consumer_outp { struct lttng_ht_iter iter; struct lttng_ht_node_ulong *node; - struct consumer_socket *socket = NULL; + struct consumer_socket *socket = nullptr; ASSERT_RCU_READ_LOCKED(); /* Negative keys are lookup failures */ - if (key < 0 || consumer == NULL) { - return NULL; + if (key < 0 || consumer == nullptr) { + return nullptr; } lttng_ht_lookup(consumer->socks, (void *) ((unsigned long) key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); - if (node != NULL) { + if (node != nullptr) { socket = lttng::utils::container_of(node, &consumer_socket::node); } @@ -420,12 +418,12 @@ struct consumer_socket *consumer_find_socket(int key, const struct consumer_outp */ struct consumer_socket *consumer_allocate_socket(int *fd) { - struct consumer_socket *socket = NULL; + struct consumer_socket *socket = nullptr; LTTNG_ASSERT(fd); socket = zmalloc(); - if (socket == NULL) { + if (socket == nullptr) { PERROR("zmalloc consumer socket"); goto error; } @@ -510,16 +508,16 @@ void consumer_destroy_socket(struct consumer_socket *sock) */ struct consumer_output *consumer_create_output(enum consumer_dst_type type) { - struct consumer_output *output = NULL; + struct consumer_output *output = nullptr; output = zmalloc(); - if (output == NULL) { + if (output == nullptr) { PERROR("zmalloc consumer_output"); goto error; } /* By default, consumer output is enabled */ - output->enabled = 1; + output->enabled = true; output->type = type; output->net_seq_index = (uint64_t) -1ULL; urcu_ref_init(&output->ref); @@ -544,12 +542,14 @@ void consumer_destroy_output_sockets(struct consumer_output *obj) return; } - rcu_read_lock(); - cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) { - consumer_del_socket(socket, obj); - consumer_destroy_socket(socket); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) { + consumer_del_socket(socket, obj); + consumer_destroy_socket(socket); + } } - rcu_read_unlock(); } /* @@ -599,7 +599,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *src) LTTNG_ASSERT(src); output = consumer_create_output(src->type); - if (output == NULL) { + if (output == nullptr) { goto end; } output->enabled = src->enabled; @@ -619,7 +619,7 @@ end: error_put: consumer_output_put(output); - return NULL; + return nullptr; } /* @@ -636,32 +636,33 @@ int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *s LTTNG_ASSERT(dst); LTTNG_ASSERT(src); - rcu_read_lock(); - cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) { - /* Ignore socket that are already there. */ - copy_sock = consumer_find_socket(*socket->fd_ptr, dst); - if (copy_sock) { - continue; - } + { + lttng::urcu::read_lock_guard read_lock; - /* Create new socket object. */ - copy_sock = consumer_allocate_socket(socket->fd_ptr); - if (copy_sock == NULL) { - rcu_read_unlock(); - ret = -ENOMEM; - goto error; - } + cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) { + /* Ignore socket that are already there. */ + copy_sock = consumer_find_socket(*socket->fd_ptr, dst); + if (copy_sock) { + continue; + } - copy_sock->registered = socket->registered; - /* - * This is valid because this lock is shared accross all consumer - * object being the global lock of the consumer data structure of the - * session daemon. - */ - copy_sock->lock = socket->lock; - consumer_add_socket(copy_sock, dst); + /* Create new socket object. */ + copy_sock = consumer_allocate_socket(socket->fd_ptr); + if (copy_sock == nullptr) { + ret = -ENOMEM; + goto error; + } + + copy_sock->registered = socket->registered; + /* + * This is valid because this lock is shared accross all consumer + * object being the global lock of the consumer data structure of the + * session daemon. + */ + copy_sock->lock = socket->lock; + consumer_add_socket(copy_sock, dst); + } } - rcu_read_unlock(); error: return ret; @@ -678,7 +679,7 @@ int consumer_set_network_uri(const struct ltt_session *session, struct lttng_uri *uri) { int ret; - struct lttng_uri *dst_uri = NULL; + struct lttng_uri *dst_uri = nullptr; /* Code flow error safety net. */ LTTNG_ASSERT(output); @@ -1268,33 +1269,35 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING; msg.u.data_pending.session_id = session_id; - /* Send command for each consumer */ - rcu_read_lock(); - cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto error_unlock; - } + { + /* Send command for each consumer. */ + lttng::urcu::read_lock_guard read_lock; - /* - * No need for a recv reply status because the answer to the command is - * the reply status message. - */ + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto error_unlock; + } + + /* + * No need for a recv reply status because the answer to the command is + * the reply status message. + */ + ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto error_unlock; + } - ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code)); - if (ret < 0) { pthread_mutex_unlock(socket->lock); - goto error_unlock; - } - pthread_mutex_unlock(socket->lock); - if (ret_code == 1) { - break; + if (ret_code == 1) { + break; + } } } - rcu_read_unlock(); DBG("Consumer data is %s pending for session id %" PRIu64, ret_code == 1 ? "" : "NOT", @@ -1302,7 +1305,6 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum return ret_code; error_unlock: - rcu_read_unlock(); return -1; } @@ -1580,35 +1582,41 @@ int consumer_get_discarded_events(uint64_t session_id, *discarded = 0; - /* Send command for each consumer */ - rcu_read_lock(); - cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - uint64_t consumer_discarded = 0; - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto end; - } + /* Send command for each consumer. */ + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { + uint64_t consumer_discarded = 0; + + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv( + socket, &consumer_discarded, sizeof(consumer_discarded)); + if (ret < 0) { + ERR("get discarded events"); + pthread_mutex_unlock(socket->lock); + goto end; + } - /* - * No need for a recv reply status because the answer to the - * command is the reply status message. - */ - ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded)); - if (ret < 0) { - ERR("get discarded events"); pthread_mutex_unlock(socket->lock); - goto end; + *discarded += consumer_discarded; } - pthread_mutex_unlock(socket->lock); - *discarded += consumer_discarded; } + ret = 0; DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id); end: - rcu_read_unlock(); return ret; } @@ -1636,35 +1644,38 @@ int consumer_get_lost_packets(uint64_t session_id, *lost = 0; - /* Send command for each consumer */ - rcu_read_lock(); - cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - uint64_t consumer_lost = 0; - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto end; - } + /* Send command for each consumer. */ + { + lttng::urcu::read_lock_guard read_lock; - /* - * No need for a recv reply status because the answer to the - * command is the reply status message. - */ - ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost)); - if (ret < 0) { - ERR("get lost packets"); + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { + uint64_t consumer_lost = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost)); + if (ret < 0) { + ERR("get lost packets"); + pthread_mutex_unlock(socket->lock); + goto end; + } pthread_mutex_unlock(socket->lock); - goto end; + *lost += consumer_lost; } - pthread_mutex_unlock(socket->lock); - *lost += consumer_lost; } + ret = 0; DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id); end: - rcu_read_unlock(); return ret; } @@ -1811,8 +1822,8 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, int ret; enum lttng_trace_chunk_status chunk_status; struct lttng_credentials chunk_credentials; - const struct lttng_directory_handle *chunk_directory_handle = NULL; - struct lttng_directory_handle *domain_handle = NULL; + const struct lttng_directory_handle *chunk_directory_handle = nullptr; + struct lttng_directory_handle *domain_handle = nullptr; int domain_dirfd; const char *chunk_name; bool chunk_name_overridden;