rcu_read_unlock();
}
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
- uint64_t key)
-{
- ssize_t ret;
-
- do {
- ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- PERROR("Failed to write to the channel rotation pipe");
- } else {
- DBG("Sent channel rotation notification for channel key %"
- PRIu64, key);
- ret = 0;
- }
-
- return (int) ret;
-}
-
/*
* Perform operations that need to be done after a stream has
* rotated and released the stream lock.
abort();
}
- if (--stream->chan->nr_stream_rotate_pending == 0) {
- DBG("Rotation of channel \"%s\" completed, notifying the session daemon",
- stream->chan->name);
- ret = rotate_notify_sessiond(ctx, stream->chan->key);
- }
pthread_mutex_unlock(&stream->chan->lock);
-
return ret;
}
/* Assign new file descriptor */
relayd->control_sock.sock.fd = fd;
- fd = -1; /* For error path */
/* Assign version values. */
relayd->control_sock.major = relayd_sock->major;
relayd->control_sock.minor = relayd_sock->minor;
/* Assign new file descriptor */
relayd->data_sock.sock.fd = fd;
- fd = -1; /* for eventual error paths */
/* Assign version values. */
relayd->data_sock.major = relayd_sock->major;
relayd->data_sock.minor = relayd_sock->minor;
DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
+ /*
+ * We gave the ownership of the fd to the relayd structure. Set the
+ * fd to -1 so we don't call close() on it in the error path below.
+ */
+ fd = -1;
/* We successfully added the socket. Send status back. */
ret = consumer_send_status_msg(sock, ret_code);
}
}
-/*
- * Try to lock the stream mutex.
- *
- * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
- */
-static int stream_try_lock(struct lttng_consumer_stream *stream)
-{
- int ret;
-
- assert(stream);
-
- /*
- * Try to lock the stream mutex. On failure, we know that the stream is
- * being used else where hence there is data still being extracted.
- */
- ret = pthread_mutex_trylock(&stream->lock);
- if (ret) {
- /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
- ret = 0;
- goto end;
- }
-
- ret = 1;
-
-end:
- return ret;
-}
-
/*
* Search for a relayd associated to the session id and return the reference.
*
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
- relayd = find_relayd_by_session_id(id);
- if (relayd) {
- /* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_begin_data_pending(&relayd->control_sock,
- relayd->relayd_session_id);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- /* Communication error thus the relayd so no data pending. */
- ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
- lttng_consumer_cleanup_relayd(relayd);
- goto data_not_pending;
- }
- }
-
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&id, lttng_ht_seed),
ht->match_fct, &id,
&iter.iter, stream, node_session_id.node) {
- /* If this call fails, the stream is being used hence data pending. */
- ret = stream_try_lock(stream);
- if (!ret) {
- goto data_pending;
- }
+ pthread_mutex_lock(&stream->lock);
/*
* A removed node from the hash table indicates that the stream has
}
}
- /* Relayd check */
- if (relayd) {
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ if (ret < 0) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ /* Communication error thus the relayd so no data pending. */
+ goto data_not_pending;
+ }
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock,
stream->relayd_stream_id);
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
}
- if (ret < 0) {
+
+ if (ret == 1) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ goto data_pending;
+ } else if (ret < 0) {
ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- pthread_mutex_unlock(&stream->lock);
goto data_not_pending;
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret == 1) {
- pthread_mutex_unlock(&stream->lock);
- goto data_pending;
- }
}
- pthread_mutex_unlock(&stream->lock);
- }
- if (relayd) {
- unsigned int is_data_inflight = 0;
-
- /* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ /* Send end command for data pending. */
ret = relayd_end_data_pending(&relayd->control_sock,
relayd->relayd_session_id, &is_data_inflight);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
* is already at the rotate position (produced == consumed), we flag it as
* ready for rotation. The rotation of ready streams occurs after we have
* replied to the session daemon that we have finished sampling the positions.
+ * Must be called with RCU read-side lock held to ensure existence of channel.
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_rotate_channel(uint64_t key, const char *path,
- uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id,
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+ uint64_t key, const char *path, uint64_t relayd_id,
+ uint32_t metadata, uint64_t new_chunk_id,
struct lttng_consumer_local_data *ctx)
{
int ret;
- struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
rcu_read_lock();
- channel = consumer_find_channel(key);
- if (!channel) {
- ERR("No channel found for key %" PRIu64, key);
- ret = -1;
- goto end;
- }
-
pthread_mutex_lock(&channel->lock);
channel->current_chunk_id = new_chunk_id;
if (consumed_pos == stream->rotate_position) {
stream->rotate_ready = true;
}
- channel->nr_stream_rotate_pending++;
ret = consumer_flush_buffer(stream, 1);
if (ret < 0) {
} else {
ret = rotate_local_stream(ctx, stream);
}
+ stream->trace_archive_id++;
if (ret < 0) {
- ERR("Rotate stream");
+ ERR("Failed to rotate stream, ret = %i", ret);
goto error;
}
* This is especially important for low throughput streams that have already
* been consumed, we cannot wait for their next packet to perform the
* rotation.
+ * Need to be called with RCU read-side lock held to ensure existence of
+ * channel.
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_rotate_ready_streams(uint64_t key,
- struct lttng_consumer_local_data *ctx)
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+ uint64_t key, struct lttng_consumer_local_data *ctx)
{
int ret;
- struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
- channel = consumer_find_channel(key);
- if (!channel) {
- ERR("No channel found for key %" PRIu64, key);
- ret = -1;
- goto end;
- }
-
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key, &iter.iter,
}
}
-int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+/* Stream lock must be acquired by the caller. */
+static
+bool check_stream_rotation_pending(const struct lttng_consumer_stream *stream,
+ uint64_t session_id, uint64_t chunk_id)
+{
+ bool pending = false;
+
+ if (stream->session_id != session_id) {
+ /* Skip. */
+ goto end;
+ }
+
+ /*
+ * If the stream's archive_id belongs to the chunk being rotated (or an
+ * even older one), it means that the consumer has not consumed all the
+ * buffers that belong to the chunk being rotated. Therefore, the
+ * rotation is considered as ongoing/pending.
+ */
+ pending = stream->trace_archive_id <= chunk_id;
+end:
+ return pending;
+}
+
+/* RCU read lock must be acquired by the caller. */
+int lttng_consumer_check_rotation_pending_local(uint64_t session_id,
+ uint64_t chunk_id)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+ bool rotation_pending = false;
+
+ /* Start with the metadata streams... */
+ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+ pthread_mutex_lock(&stream->lock);
+ rotation_pending = check_stream_rotation_pending(stream,
+ session_id, chunk_id);
+ pthread_mutex_unlock(&stream->lock);
+ if (rotation_pending) {
+ goto end;
+ }
+ }
+
+ /* ... followed by the data streams. */
+ cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+ pthread_mutex_lock(&stream->lock);
+ rotation_pending = check_stream_rotation_pending(stream,
+ session_id, chunk_id);
+ pthread_mutex_unlock(&stream->lock);
+ if (rotation_pending) {
+ goto end;
+ }
+ }
+
+end:
+ return !!rotation_pending;
+}
+
+int lttng_consumer_check_rotation_pending_relay(uint64_t session_id,
uint64_t relayd_id, uint64_t chunk_id)
{
int ret;
relayd = consumer_find_relayd(relayd_id);
if (!relayd) {
- ERR("Failed to find relayd");
+ ERR("Failed to find relayd id %" PRIu64, relayd_id);
ret = -1;
goto end;
}