summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
f21dae4)
This is to ensure the validity of the metadata stream in the channel
before closing or pushing the metadata on it.
Fixes #536
Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
/*
* Lock to update the metadata cache and push into the ring_buffer
* (ustctl_write_metadata_to_channel).
/*
* Lock to update the metadata cache and push into the ring_buffer
* (ustctl_write_metadata_to_channel).
+ *
+ * This is nested INSIDE the consumer_data lock.
*/
pthread_mutex_t lock;
};
*/
pthread_mutex_t lock;
};
+ /*
+ * Nullify the stream reference so it is not used after deletion. The
+ * consumer data lock MUST be acquired before being able to check for a
+ * NULL pointer value.
+ */
+ stream->chan->metadata_stream = NULL;
+
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&consumer_data.lock);
* Lock to use the stream FDs since they are used between threads.
*
* This is nested INSIDE the consumer_data lock.
* Lock to use the stream FDs since they are used between threads.
*
* This is nested INSIDE the consumer_data lock.
+ * This is nested INSIDE the metadata cache lock.
* This is nested OUTSIDE consumer_relayd_sock_pair lock.
*/
pthread_mutex_t lock;
* This is nested OUTSIDE consumer_relayd_sock_pair lock.
*/
pthread_mutex_t lock;
DBG("UST consumer writing metadata to channel %s", metadata->name);
DBG("UST consumer writing metadata to channel %s", metadata->name);
+ if (!metadata->metadata_stream) {
+ ret = 0;
+ goto error;
+ }
+
assert(target_offset <= metadata->metadata_cache->max_offset);
ret = ustctl_write_metadata_to_channel(metadata->uchan,
metadata_str + target_offset, len);
assert(target_offset <= metadata->metadata_cache->max_offset);
ret = ustctl_write_metadata_to_channel(metadata->uchan,
metadata_str + target_offset, len);
}
pthread_mutex_lock(&consumer_data.lock);
}
pthread_mutex_lock(&consumer_data.lock);
- if (!cds_lfht_is_node_deleted(&channel->node.node)) {
- if (channel->switch_timer_enabled == 1) {
- DBG("Deleting timer on metadata channel");
- consumer_timer_switch_stop(channel);
- }
+
+ if (cds_lfht_is_node_deleted(&channel->node.node)) {
+ goto error_unlock;
+ }
+
+ if (channel->switch_timer_enabled == 1) {
+ DBG("Deleting timer on metadata channel");
+ consumer_timer_switch_stop(channel);
+ }
+
+ if (channel->metadata_stream) {
ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
if (ret < 0) {
ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
if (ret < 0) {
ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
+ /*
+ * XXX: The consumer data lock is acquired before calling metadata cache
+ * write which calls push metadata that MUST be protected by the consumer
+ * lock in order to be able to check the validity of the metadata stream of
+ * the channel.
+ *
+ * Note that this will be subject to change to better fine grained locking
+ * and ultimately try to get rid of this global consumer data lock.
+ */
+ pthread_mutex_lock(&consumer_data.lock);
+
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
if (ret < 0) {
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
if (ret < 0) {
ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
while (consumer_metadata_cache_flushed(channel, offset + len)) {
DBG("Waiting for metadata to be flushed");
while (consumer_metadata_cache_flushed(channel, offset + len)) {
DBG("Waiting for metadata to be flushed");