* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <assert.h>
#include <common/common.h>
#include <common/utils.h>
+#include <common/compat/endian.h>
#include "lttng-relayd.h"
#include "stream.h"
#include "index.h"
+#include "connection.h"
/*
* Allocate a new relay index object. Pass the stream in which it is
lttng_ht_node_init_u64(&index->index_n, net_seq_num);
pthread_mutex_init(&index->lock, NULL);
- pthread_mutex_init(&index->reflock, NULL);
urcu_ref_init(&index->ref);
end:
*/
static bool relay_index_get(struct relay_index *index)
{
- bool has_ref = false;
-
DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
index->stream->stream_handle, index->index_n.key,
(int) index->ref.refcount);
- /* Confirm that the index refcount has not reached 0. */
- pthread_mutex_lock(&index->reflock);
- if (index->ref.refcount != 0) {
- has_ref = true;
- urcu_ref_get(&index->ref);
- }
- pthread_mutex_unlock(&index->reflock);
-
- return has_ref;
+ return urcu_ref_get_unless_zero(&index->ref);
}
/*
index = relay_index_create(stream, net_seq_num);
if (!index) {
ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
- index->stream->stream_handle, net_seq_num);
+ stream->stream_handle, net_seq_num);
goto end;
}
oldindex = relay_index_add_unique(stream, index);
end:
rcu_read_unlock();
DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
- (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num);
+ (index == NULL) ? "NOT " : "", stream->stream_handle, net_seq_num);
return index;
}
-int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+int relay_index_set_file(struct relay_index *index,
+ struct lttng_index_file *index_file,
uint64_t data_offset)
{
int ret = 0;
pthread_mutex_lock(&index->lock);
- if (index->index_fd) {
+ if (index->index_file) {
ret = -1;
goto end;
}
- stream_fd_get(index_fd);
- index->index_fd = index_fd;
+ lttng_index_file_get(index_file);
+ index->index_file = index_file;
index->index_data.offset = data_offset;
end:
pthread_mutex_unlock(&index->lock);
int ret;
struct lttng_ht_iter iter;
- if (index->index_fd) {
- stream_fd_put(index->index_fd);
- index->index_fd = NULL;
+ if (index->index_file) {
+ lttng_index_file_put(index->index_file);
+ index->index_file = NULL;
}
if (index->in_hash_table) {
/* Delete index from hash table. */
* Index lock ensures that concurrent test and update of stream
* ref is atomic.
*/
- pthread_mutex_lock(&index->reflock);
assert(index->ref.refcount != 0);
urcu_ref_put(&index->ref, index_release);
- pthread_mutex_unlock(&index->reflock);
rcu_read_unlock();
}
goto skip;
}
/* Check if we are ready to flush. */
- if (!index->has_index_data || !index->index_fd) {
+ if (!index->has_index_data || !index->index_file) {
goto skip;
}
- fd = index->index_fd->fd;
+ fd = index->index_file->fd;
DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
" on fd %d", index->stream->stream_handle,
index->index_n.key, fd);
flushed = true;
index->flushed = true;
- ret = index_write(fd, &index->index_data, sizeof(index->index_data));
- if (ret == sizeof(index->index_data)) {
- ret = 0;
- } else {
- ret = -1;
- }
+ ret = lttng_index_file_write(index->index_file, &index->index_data);
skip:
pthread_mutex_unlock(&index->lock);
rcu_read_lock();
cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
index, index_n.node) {
- if (!index->index_fd) {
+ if (!index->index_file) {
continue;
}
/*
- * Partial index has its index_fd: we have only
+ * Partial index has its index_file: we have only
* received its info from the data socket.
* Put self-ref from index.
*/
rcu_read_unlock();
return net_seq_num;
}
+
+/*
+ * Update the index file of an already existing relay_index.
+ * Offsets by 'removed_data_count' the offset field of an index.
+ */
+static
+int relay_index_switch_file(struct relay_index *index,
+ struct lttng_index_file *new_index_file,
+ uint64_t removed_data_count)
+{
+ int ret = 0;
+ uint64_t offset;
+
+ pthread_mutex_lock(&index->lock);
+ if (!index->index_file) {
+ ERR("No index_file");
+ ret = 0;
+ goto end;
+ }
+
+ lttng_index_file_put(index->index_file);
+ lttng_index_file_get(new_index_file);
+ index->index_file = new_index_file;
+ offset = be64toh(index->index_data.offset);
+ index->index_data.offset = htobe64(offset - removed_data_count);
+
+end:
+ pthread_mutex_unlock(&index->lock);
+ return ret;
+}
+
+/*
+ * Switch the index file of all pending indexes for a stream and update the
+ * data offset by substracting the last safe position.
+ * Stream lock must be held.
+ */
+int relay_index_switch_all_files(struct relay_stream *stream)
+{
+ struct lttng_ht_iter iter;
+ struct relay_index *index;
+ int ret = 0;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+ index, index_n.node) {
+ DBG("Update index to fd %d", stream->index_file->fd);
+ ret = relay_index_switch_file(index, stream->index_file,
+ stream->pos_after_last_complete_data_index);
+ if (ret) {
+ goto end;
+ }
+ }
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Set index data from the control port to a given index object.
+ */
+int relay_index_set_control_data(struct relay_index *index,
+ const struct lttcomm_relayd_index *data,
+ unsigned int minor_version)
+{
+ /* The index on disk is encoded in big endian. */
+ const struct ctf_packet_index index_data = {
+ .packet_size = htobe64(data->packet_size),
+ .content_size = htobe64(data->content_size),
+ .timestamp_begin = htobe64(data->timestamp_begin),
+ .timestamp_end = htobe64(data->timestamp_end),
+ .events_discarded = htobe64(data->events_discarded),
+ .stream_id = htobe64(data->stream_id),
+ };
+
+ if (minor_version >= 8) {
+ index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
+ index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
+ } else {
+ uint64_t unset_value = -1ULL;
+
+ index->index_data.stream_instance_id = htobe64(unset_value);
+ index->index_data.packet_seq_num = htobe64(unset_value);
+ }
+
+ return relay_index_set_data(index, &index_data);
+}