* RCU read-side lock needs to be held across entire usage of the
returned structure pointer, not just the lookup.
* Moving refcount inc/dec to uatomic ops. Theoretically not 100%
required for now, but won't hurt when we move to multithread.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
}
/* Check and cleanup relayd */
}
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
- /* We are about to modify the relayd refcount */
- rcu_read_lock();
- if (!--relayd->refcount) {
+ uatomic_dec(&relayd->refcount);
+ assert(uatomic_read(&relayd->refcount) >= 0);
+ if (uatomic_read(&relayd->refcount) == 0) {
/* Refcount of the relayd struct is 0, destroy it */
consumer_destroy_relayd(relayd);
}
/* Refcount of the relayd struct is 0, destroy it */
consumer_destroy_relayd(relayd);
}
if (!--stream->chan->refcount) {
free_chan = stream->chan;
if (!--stream->chan->refcount) {
free_chan = stream->chan;
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
lttng_ht_lookup(consumer_data.stream_ht,
(void *)((unsigned long) stream->key), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
lttng_ht_lookup(consumer_data.stream_ht,
(void *)((unsigned long) stream->key), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
}
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
}
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
- /* We are about to modify the relayd refcount */
- rcu_read_lock();
- relayd->refcount++;
- rcu_read_unlock();
+ uatomic_inc(&relayd->refcount);
/* Update consumer data */
consumer_data.stream_count++;
/* Update consumer data */
consumer_data.stream_count++;
* Find a relayd socket pair in the global consumer data.
*
* Return the object if found else NULL.
* Find a relayd socket pair in the global consumer data.
*
* Return the object if found else NULL.
+ * RCU read-side lock must be held across this call and while using the
+ * returned object.
*/
struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
{
*/
struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
{
lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
}
relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
}
/* Reset data header */
memset(&data_hdr, 0, sizeof(data_hdr));
/* Reset data header */
memset(&data_hdr, 0, sizeof(data_hdr));
/* Get relayd reference of the stream. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
/* Get relayd reference of the stream. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
- /* RCU lock for the relayd pointer */
- rcu_read_lock();
-
/* Handle stream on the relayd if the output is on the network */
if (relayd) {
/*
/* Handle stream on the relayd if the output is on the network */
if (relayd) {
/*
uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
- /* RCU lock for the relayd pointer */
- rcu_read_lock();
-
/* Write metadata stream id before payload */
if (stream->metadata_flag && relayd) {
/*
/* Write metadata stream id before payload */
if (stream->metadata_flag && relayd) {
/*
+ /* relayd needs RCU read-side protection */
+ rcu_read_lock();
+
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
ret = write(ctx->consumer_poll_pipe[1], "", 1);
} while (ret < 0 && errno == EINTR);
end_nosignal:
ret = write(ctx->consumer_poll_pipe[1], "", 1);
} while (ret < 0 && errno == EINTR);
end_nosignal:
uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
+ ERR("Cannot find relay for network stream\n");
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
+ /* relayd need RCU read-side lock */
+ rcu_read_lock();
+
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
ret = write(ctx->consumer_poll_pipe[1], "", 1);
} while (ret < 0 && errno == EINTR);
end_nosignal:
ret = write(ctx->consumer_poll_pipe[1], "", 1);
} while (ret < 0 && errno == EINTR);
end_nosignal: