Send indexes in streaming mode
authorJulien Desfossez <jdesfossez@efficios.com>
Mon, 19 Aug 2013 15:35:43 +0000 (11:35 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 27 Sep 2013 16:18:04 +0000 (12:18 -0400)
To write an index on disk, we need to know all the fields sent by the
RELAY_SEND_INDEX command as well as the offset of the data in the
tracefile.

Since the control and data connection are not synchronized, this process
happens in two separate steps synchronized by an HT indexed by the
index_handle and the net_seq_num.

When we receive data and when we receive an index, we lookup in the HT
if an entry already exists. If it does, it means that we only need to
fill the fields we just received and write the index on disk, otherwise,
we allocate a new index, set the fields we know and store it in the HT.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
17 files changed:
include/lttng/lttng-error.h
src/bin/lttng-relayd/Makefile.am
src/bin/lttng-relayd/index.c [new file with mode: 0644]
src/bin/lttng-relayd/index.h [new file with mode: 0644]
src/bin/lttng-relayd/lttng-relayd.h
src/bin/lttng-relayd/main.c
src/common/consumer-stream.c
src/common/consumer-stream.h
src/common/consumer.c
src/common/defaults.h
src/common/index/index.c
src/common/kernel-consumer/kernel-consumer.c
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index 5e0dd07b272027b9b50539d6a17c6d73f2f362cc..b43c88fbe3de578462f83f1860a3f385d89528f0 100644 (file)
@@ -82,7 +82,7 @@ enum lttng_error_code {
        LTTNG_ERR_KERN_STREAM_FAIL       = 49,  /* Kernel create stream failed */
        LTTNG_ERR_START_SESSION_ONCE     = 50,  /* Session needs to be started once. */
        LTTNG_ERR_SNAPSHOT_FAIL          = 51,  /* Snapshot record failed. */
-       /* 52 */
+       LTTNG_ERR_NO_STREAM              = 52,  /* Index without stream on relay. */
        LTTNG_ERR_KERN_LIST_FAIL         = 53,  /* Kernel listing events failed */
        LTTNG_ERR_UST_CALIBRATE_FAIL     = 54,  /* UST calibration failed */
        LTTNG_ERR_UST_EVENT_ENABLED      = 55,  /* UST event already enabled. */
index ed8214429a48310337f8289f066347b5bd2bdfa9..853c21ae989014dd5bc8dab795a3a9961b13daac 100644 (file)
@@ -7,6 +7,7 @@ AM_CFLAGS = -fno-strict-aliasing
 bin_PROGRAMS = lttng-relayd
 
 lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \
+                       index.c index.h \
                        cmd-generic.c cmd-generic.h \
                        cmd-2-1.c cmd-2-1.h \
                        cmd-2-2.c cmd-2-2.h
@@ -17,4 +18,5 @@ lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
                $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \
                $(top_builddir)/src/common/hashtable/libhashtable.la \
                $(top_builddir)/src/common/libcommon.la \
-               $(top_builddir)/src/common/compat/libcompat.la
+               $(top_builddir)/src/common/compat/libcompat.la \
+               $(top_builddir)/src/common/index/libindex.la
diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c
new file mode 100644 (file)
index 0000000..97918cd
--- /dev/null
@@ -0,0 +1,218 @@
+/*
+ * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
+ *                      David Goulet <dgoulet@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#include <assert.h>
+
+#include <common/common.h>
+#include <common/utils.h>
+
+#include "index.h"
+
+/*
+ * Deferred free of a relay index object. MUST only be called by a call RCU.
+ */
+static void deferred_free_relay_index(struct rcu_head *head)
+{
+       struct relay_index *index =
+               caa_container_of(head, struct relay_index, rcu_node);
+
+       if (index->to_close_fd >= 0) {
+               int ret;
+
+               ret = close(index->to_close_fd);
+               if (ret < 0) {
+                       PERROR("Relay index to close fd %d", index->to_close_fd);
+               }
+       }
+
+       relay_index_free(index);
+}
+
+/*
+ * Allocate a new relay index object using the given stream ID and sequence
+ * number as the hash table key.
+ *
+ * Return allocated object or else NULL on error.
+ */
+struct relay_index *relay_index_create(uint64_t stream_id,
+               uint64_t net_seq_num)
+{
+       struct relay_index *index;
+
+       DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
+                       stream_id, net_seq_num);
+
+       index = zmalloc(sizeof(*index));
+       if (index == NULL) {
+               PERROR("Relay index zmalloc");
+               goto error;
+       }
+
+       index->to_close_fd = -1;
+       lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num);
+
+error:
+       return index;
+}
+
+/*
+ * Find a relayd index in the given hash table.
+ *
+ * Return index object or else NULL on error.
+ */
+struct relay_index *relay_index_find(uint64_t stream_id,
+               uint64_t net_seq_num, struct lttng_ht *ht)
+{
+       struct lttng_ht_node_two_u64 *node;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_two_u64 key;
+       struct relay_index *index = NULL;
+
+       assert(ht);
+
+       DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
+                       stream_id, net_seq_num);
+
+       key.key1 = stream_id;
+       key.key2 = net_seq_num;
+
+       lttng_ht_lookup(ht, (void *)(&key), &iter);
+       node = lttng_ht_iter_get_node_two_u64(&iter);
+       if (node == NULL) {
+               goto end;
+       }
+       index = caa_container_of(node, struct relay_index, index_n);
+
+end:
+       DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
+                       (index == NULL) ? "NOT " : "", stream_id, net_seq_num);
+       return index;
+}
+
+/*
+ * Add unique relay index to the given hash table. In case of a collision, the
+ * already existing object is put in the given _index variable.
+ *
+ * RCU read side lock MUST be acquired.
+ */
+void relay_index_add(struct relay_index *index, struct lttng_ht *ht,
+               struct relay_index **_index)
+{
+       struct cds_lfht_node *node_ptr;
+
+       assert(index);
+       assert(ht);
+       assert(_index);
+
+       DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
+                       index->key.key1, index->key.key2);
+
+       node_ptr = cds_lfht_add_unique(ht->ht,
+                       ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed),
+                       ht->match_fct, (void *) &index->index_n.key,
+                       &index->index_n.node);
+       if (node_ptr != &index->index_n.node) {
+               *_index = caa_container_of(node_ptr, struct relay_index, index_n.node);
+       }
+}
+
+/*
+ * Write index on disk to the given fd. Once done error or not, it is removed
+ * from the hash table and destroy the object.
+ *
+ * MUST be called with a RCU read side lock held.
+ *
+ * Return 0 on success else a negative value.
+ */
+int relay_index_write(int fd, struct relay_index *index, struct lttng_ht *ht)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+
+       DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
+                       " on fd %d", index->key.key1, index->key.key2, fd);
+
+       /* Delete index from hash table. */
+       iter.iter.node = &index->index_n.node;
+       ret = lttng_ht_del(ht, &iter);
+       assert(!ret);
+       call_rcu(&index->rcu_node, deferred_free_relay_index);
+
+       return index_write(fd, &index->index_data, sizeof(index->index_data));
+}
+
+/*
+ * Free the given index.
+ */
+void relay_index_free(struct relay_index *index)
+{
+       free(index);
+}
+
+/*
+ * Safely free the given index using a call RCU.
+ */
+void relay_index_free_safe(struct relay_index *index)
+{
+       if (!index) {
+               return;
+       }
+
+       call_rcu(&index->rcu_node, deferred_free_relay_index);
+}
+
+/*
+ * Delete index from the given hash table.
+ *
+ * RCU read side lock MUST be acquired.
+ */
+void relay_index_delete(struct relay_index *index, struct lttng_ht *ht)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+
+       DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64
+                       "deleted.", index->key.key1, index->key.key2);
+
+       /* Delete index from hash table. */
+       iter.iter.node = &index->index_n.node;
+       ret = lttng_ht_del(ht, &iter);
+       assert(!ret);
+}
+
+/*
+ * Destroy every relay index with the given stream id as part of the key.
+ */
+void relay_index_destroy_by_stream_id(uint64_t stream_id, struct lttng_ht *ht)
+{
+       struct lttng_ht_iter iter;
+       struct relay_index *index;
+
+       assert(ht);
+       assert(ht->ht);
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, index, index_n.node) {
+               if (index->key.key1 == stream_id) {
+                       relay_index_delete(index, ht);
+                       relay_index_free_safe(index);
+               }
+       }
+       rcu_read_unlock();
+}
diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h
new file mode 100644 (file)
index 0000000..fc184e9
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
+ *                      David Goulet <dgoulet@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef _RELAY_INDEX_H
+#define _RELAY_INDEX_H
+
+#include <inttypes.h>
+#include <pthread.h>
+
+#include <common/hashtable/hashtable.h>
+#include <common/index/index.h>
+
+struct relay_index {
+       /* FD on which to write the index data. */
+       int fd;
+       /*
+        * When destroying this object, this fd is checked and if valid, close it
+        * so this is basically a lazy close of the previous fd corresponding to
+        * the same stream id. This is used for the rotate file feature.
+        */
+       int to_close_fd;
+
+       /* Index packet data. This is the data that is written on disk. */
+       struct lttng_packet_index index_data;
+
+       /* key1 = stream_id, key2 = net_seq_num */
+       struct lttng_ht_two_u64 key;
+       struct lttng_ht_node_two_u64 index_n;
+       struct rcu_head rcu_node;
+       pthread_mutex_t mutex;
+};
+
+struct relay_index *relay_index_create(uint64_t stream_id,
+               uint64_t net_seq_num);
+struct relay_index *relay_index_find(uint64_t stream_id,
+               uint64_t net_seq_num, struct lttng_ht *ht);
+void relay_index_add(struct relay_index *index, struct lttng_ht *ht,
+               struct relay_index **_index);
+int relay_index_write(int fd, struct relay_index *index, struct lttng_ht *ht);
+void relay_index_free(struct relay_index *index);
+void relay_index_free_safe(struct relay_index *index);
+void relay_index_delete(struct relay_index *index, struct lttng_ht *ht);
+void relay_index_destroy_by_stream_id(uint64_t stream_id, struct lttng_ht *ht);
+
+#endif /* _RELAY_INDEX_H */
index 61db23a7752930ac5b351236f90d3e255181fb27..c60280e0fd9c717dc0bd0d1e975236ca0e73bed5 100644 (file)
@@ -23,6 +23,7 @@
 #include <urcu.h>
 #include <urcu/wfqueue.h>
 #include <common/hashtable/hashtable.h>
+#include <common/index/lttng-index.h>
 
 /*
  * Queue used to enqueue relay requests
@@ -60,6 +61,7 @@ struct relay_stream {
        struct relay_session *session;
        struct rcu_head rcu_node;
        int fd;
+       /* FD on which to write the index data. */
        int index_fd;
 
        char *path_name;
index bb038a670b0a763afd41b1343cbbb1e919fbe1f7..ca37b8bc41ed842111ee603ff2860e90b288b758 100644 (file)
@@ -52,6 +52,7 @@
 #include <common/utils.h>
 
 #include "cmd.h"
+#include "index.h"
 #include "utils.h"
 #include "lttng-relayd.h"
 
@@ -62,7 +63,6 @@ static struct lttng_uri *control_uri;
 static struct lttng_uri *data_uri;
 
 const char *progname;
-static int is_root;                    /* Set to 1 if the daemon is running as root */
 
 /*
  * Quit pipe for all threads. This permits a single cancellation point
@@ -98,6 +98,13 @@ static struct relay_cmd_queue relay_cmd_queue;
 static char *data_buffer;
 static unsigned int data_buffer_size;
 
+/* Global hash table that stores relay index object. */
+static struct lttng_ht *indexes_ht;
+
+/* We need those values for the file/dir creation. */
+static uid_t relayd_uid;
+static gid_t relayd_gid;
+
 /*
  * usage function on stderr
  */
@@ -758,6 +765,9 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht
                                call_rcu(&stream->rcu_node,
                                        deferred_free_stream);
                        }
+                       /* Cleanup index of that stream. */
+                       relay_index_destroy_by_stream_id(stream->stream_handle,
+                                       indexes_ht);
                }
        }
        rcu_read_unlock();
@@ -765,6 +775,28 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht
        free(cmd->session);
 }
 
+/*
+ * Copy index data from the control port to a given index object.
+ */
+static void copy_index_control_data(struct relay_index *index,
+               struct lttcomm_relayd_index *data)
+{
+       assert(index);
+       assert(data);
+
+       /*
+        * The index on disk is encoded in big endian, so we don't need to convert
+        * the data received on the network. The data_offset value is NEVER
+        * modified here and is updated by the data thread.
+        */
+       index->index_data.packet_size = data->packet_size;
+       index->index_data.content_size = data->content_size;
+       index->index_data.timestamp_begin = data->timestamp_begin;
+       index->index_data.timestamp_end = data->timestamp_end;
+       index->index_data.events_discarded = data->events_discarded;
+       index->index_data.stream_id = data->stream_id;
+}
+
 /*
  * Handle the RELAYD_CREATE_SESSION command.
  *
@@ -856,6 +888,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->stream_handle = ++last_relay_stream_id;
        stream->prev_seq = -1ULL;
        stream->session = session;
+       stream->index_fd = -1;
 
        ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
        if (ret < 0) {
@@ -868,7 +901,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
         * uses its own credentials for the stream files.
         */
        ret = utils_create_stream_file(stream->path_name, stream->channel_name,
-                       stream->tracefile_size, 0, -1, -1, NULL);
+                       stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
        if (ret < 0) {
                ERR("Create output file");
                goto end;
@@ -885,7 +918,8 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        lttng_ht_add_unique_ulong(streams_ht,
                        &stream->stream_n);
 
-       DBG("Relay new stream added %s", stream->channel_name);
+       DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
+                       stream->stream_handle);
 
 end:
        reply.handle = htobe64(stream->stream_handle);
@@ -969,6 +1003,13 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
                if (delret < 0) {
                        PERROR("close stream");
                }
+
+               if (stream->index_fd >= 0) {
+                       delret = close(stream->index_fd);
+                       if (delret < 0) {
+                               PERROR("close stream index_fd");
+                       }
+               }
                iter.iter.node = &stream->stream_n.node;
                delret = lttng_ht_del(streams_ht, &iter);
                assert(!delret);
@@ -1503,12 +1544,132 @@ end_no_session:
        return ret;
 }
 
+/*
+ * Receive an index for a specific stream.
+ *
+ * Return 0 on success else a negative value.
+ */
+static
+int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_command *cmd, struct lttng_ht *streams_ht,
+               struct lttng_ht *indexes_ht)
+{
+       int ret, send_ret, index_created = 0;
+       struct relay_session *session = cmd->session;
+       struct lttcomm_relayd_index index_info;
+       struct relay_index *index, *wr_index = NULL;
+       struct lttcomm_relayd_generic_reply reply;
+       struct relay_stream *stream;
+       uint64_t net_seq_num;
+
+       assert(cmd);
+       assert(streams_ht);
+       assert(indexes_ht);
+
+       DBG("Relay receiving index");
+
+       if (!session || cmd->version_check_done == 0) {
+               ERR("Trying to close a stream before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       ret = cmd->sock->ops->recvmsg(cmd->sock, &index_info,
+                       sizeof(index_info), 0);
+       if (ret < sizeof(index_info)) {
+               if (ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+               } else {
+                       ERR("Relay didn't receive valid index struct size : %d", ret);
+               }
+               ret = -1;
+               goto end_no_session;
+       }
+
+       net_seq_num = be64toh(index_info.net_seq_num);
+
+       rcu_read_lock();
+       stream = relay_stream_from_stream_id(be64toh(index_info.relay_stream_id),
+                       streams_ht);
+       if (!stream) {
+               ret = -1;
+               goto end_rcu_unlock;
+       }
+
+       index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht);
+       if (!index) {
+               /* A successful creation will add the object to the HT. */
+               index = relay_index_create(stream->stream_handle, net_seq_num);
+               if (!index) {
+                       goto end_rcu_unlock;
+               }
+               index_created = 1;
+       }
+
+       copy_index_control_data(index, &index_info);
+
+       if (index_created) {
+               /*
+                * Try to add the relay index object to the hash table. If an object
+                * already exist, destroy back the index created, set the data in this
+                * object and write it on disk.
+                */
+               relay_index_add(index, indexes_ht, &wr_index);
+               if (wr_index) {
+                       copy_index_control_data(wr_index, &index_info);
+                       free(index);
+               }
+       } else {
+               /* The index already exists so write it on disk. */
+               wr_index = index;
+       }
+
+       /* Do we have a writable ready index to write on disk. */
+       if (wr_index) {
+               /* Starting at 2.4, create the index file if none available. */
+               if (cmd->minor >= 4 && stream->index_fd < 0) {
+                       ret = index_create_file(stream->path_name, stream->channel_name,
+                                       relayd_uid, relayd_gid, stream->tracefile_size,
+                                       stream->tracefile_count_current);
+                       if (ret < 0) {
+                               goto end_rcu_unlock;
+                       }
+                       stream->index_fd = ret;
+               }
+
+               ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+               if (ret < 0) {
+                       goto end_rcu_unlock;
+               }
+       }
+
+end_rcu_unlock:
+       rcu_read_unlock();
+
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+       send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+       if (send_ret < 0) {
+               ERR("Relay sending close index id reply");
+               ret = send_ret;
+       }
+
+end_no_session:
+       return ret;
+}
+
 /*
  * relay_process_control: Process the commands received on the control socket
  */
 static
 int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
-               struct relay_command *cmd, struct lttng_ht *streams_ht)
+               struct relay_command *cmd, struct lttng_ht *streams_ht,
+               struct lttng_ht *index_streams_ht,
+               struct lttng_ht *indexes_ht)
 {
        int ret = 0;
 
@@ -1543,6 +1704,9 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_END_DATA_PENDING:
                ret = relay_end_data_pending(recv_hdr, cmd, streams_ht);
                break;
+       case RELAYD_SEND_INDEX:
+               ret = relay_recv_index(recv_hdr, cmd, streams_ht, indexes_ht);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
@@ -1559,12 +1723,14 @@ end:
  * relay_process_data: Process the data received on the data socket
  */
 static
-int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
+int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht,
+               struct lttng_ht *indexes_ht)
 {
-       int ret = 0;
+       int ret = 0, rotate_index = 0, index_created = 0;
        struct relay_stream *stream;
+       struct relay_index *index, *wr_index = NULL;
        struct lttcomm_relayd_data_hdr data_hdr;
-       uint64_t stream_id;
+       uint64_t stream_id, data_offset;
        uint64_t net_seq_num;
        uint32_t data_size;
 
@@ -1587,7 +1753,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
        stream = relay_stream_from_stream_id(stream_id, streams_ht);
        if (!stream) {
                ret = -1;
-               goto end_unlock;
+               goto end_rcu_unlock;
        }
 
        data_size = be32toh(data_hdr.data_size);
@@ -1599,7 +1765,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
                        ERR("Allocating data buffer");
                        free(data_buffer);
                        ret = -1;
-                       goto end_unlock;
+                       goto end_rcu_unlock;
                }
                data_buffer = tmp_data_ptr;
                data_buffer_size = data_size;
@@ -1617,33 +1783,103 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
                        DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
                }
                ret = -1;
-               goto end_unlock;
+               goto end_rcu_unlock;
        }
 
+       /* Check if a rotation is needed. */
        if (stream->tracefile_size > 0 &&
                        (stream->tracefile_size_current + data_size) >
                        stream->tracefile_size) {
-               ret = utils_rotate_stream_file(stream->path_name,
-                               stream->channel_name, stream->tracefile_size,
-                               stream->tracefile_count, -1, -1,
-                               stream->fd, &(stream->tracefile_count_current),
-                               &stream->fd);
+               ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
+                               stream->tracefile_size, stream->tracefile_count,
+                               relayd_uid, relayd_gid, stream->fd,
+                               &(stream->tracefile_count_current), &stream->fd);
                if (ret < 0) {
-                       ERR("Rotating output file");
-                       goto end;
+                       ERR("Rotating stream output file");
+                       goto end_rcu_unlock;
                }
-               stream->fd = ret;
                /* Reset current size because we just perform a stream rotation. */
                stream->tracefile_size_current = 0;
+               rotate_index = 1;
+       }
+
+       /* Get data offset because we are about to update the index. */
+       data_offset = htobe64(stream->tracefile_size_current);
+
+       /*
+        * Lookup for an existing index for that stream id/sequence number. If on
+        * exists, the control thread already received the data for it thus we need
+        * to write it on disk.
+        */
+       index = relay_index_find(stream_id, net_seq_num, indexes_ht);
+       if (!index) {
+               /* A successful creation will add the object to the HT. */
+               index = relay_index_create(stream->stream_handle, net_seq_num);
+               if (!index) {
+                       goto end_rcu_unlock;
+               }
+               index_created = 1;
+       }
+
+       if (rotate_index || stream->index_fd < 0) {
+               index->to_close_fd = stream->index_fd;
+               ret = index_create_file(stream->path_name, stream->channel_name,
+                               relayd_uid, relayd_gid, stream->tracefile_size,
+                               stream->tracefile_count_current);
+               if (ret < 0) {
+                       /* This will close the stream's index fd if one. */
+                       relay_index_free_safe(index);
+                       goto end_rcu_unlock;
+               }
+               stream->index_fd = ret;
+       }
+       index->fd = stream->index_fd;
+       index->index_data.offset = data_offset;
+
+       if (index_created) {
+               /*
+                * Try to add the relay index object to the hash table. If an object
+                * already exist, destroy back the index created and set the data.
+                */
+               relay_index_add(index, indexes_ht, &wr_index);
+               if (wr_index) {
+                       /* Copy back data from the created index. */
+                       wr_index->fd = index->fd;
+                       wr_index->to_close_fd = index->to_close_fd;
+                       wr_index->index_data.offset = data_offset;
+                       free(index);
+               }
+       } else {
+               /* The index already exists so write it on disk. */
+               wr_index = index;
        }
-       stream->tracefile_size_current += data_size;
+
+       /* Do we have a writable ready index to write on disk. */
+       if (wr_index) {
+               /* Starting at 2.4, create the index file if none available. */
+               if (cmd->minor >= 4 && stream->index_fd < 0) {
+                       ret = index_create_file(stream->path_name, stream->channel_name,
+                                       relayd_uid, relayd_gid, stream->tracefile_size,
+                                       stream->tracefile_count_current);
+                       if (ret < 0) {
+                               goto end_rcu_unlock;
+                       }
+                       stream->index_fd = ret;
+               }
+
+               ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+               if (ret < 0) {
+                       goto end_rcu_unlock;
+               }
+       }
+
        do {
                ret = write(stream->fd, data_buffer, data_size);
        } while (ret < 0 && errno == EINTR);
        if (ret < 0 || ret != data_size) {
                ERR("Relay error writing data to file");
                ret = -1;
-               goto end_unlock;
+               goto end_rcu_unlock;
        }
 
        DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
@@ -1651,8 +1887,9 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
 
        ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
        if (ret < 0) {
-               goto end_unlock;
+               goto end_rcu_unlock;
        }
+       stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size);
 
        stream->prev_seq = net_seq_num;
 
@@ -1665,6 +1902,11 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
                if (cret < 0) {
                        PERROR("close stream process data");
                }
+
+               cret = close(stream->index_fd);
+               if (cret < 0) {
+                       PERROR("close stream index_fd");
+               }
                iter.iter.node = &stream->stream_n.node;
                ret = lttng_ht_del(streams_ht, &iter);
                assert(!ret);
@@ -1673,7 +1915,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
                DBG("Closed tracefile %d after recv data", stream->fd);
        }
 
-end_unlock:
+end_rcu_unlock:
        rcu_read_unlock();
 end:
        return ret;
@@ -1769,6 +2011,7 @@ void *relay_thread_worker(void *data)
        struct lttng_ht_node_ulong *node;
        struct lttng_ht_iter iter;
        struct lttng_ht *streams_ht;
+       struct lttng_ht *index_streams_ht;
        struct lttcomm_relayd_hdr recv_hdr;
 
        DBG("[thread] Relay worker started");
@@ -1787,6 +2030,12 @@ void *relay_thread_worker(void *data)
                goto streams_ht_error;
        }
 
+       /* Tables of received indexes indexed by index handle and net_seq_num. */
+       indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64);
+       if (!indexes_ht) {
+               goto indexes_ht_error;
+       }
+
        ret = create_thread_poll_set(&events, 2);
        if (ret < 0) {
                goto error_poll_create;
@@ -1898,7 +2147,9 @@ restart:
                                                        }
                                                        ret = relay_process_control(&recv_hdr,
                                                                        relay_connection,
-                                                                       streams_ht);
+                                                                       streams_ht,
+                                                                       index_streams_ht,
+                                                                       indexes_ht);
                                                        if (ret < 0) {
                                                                /* Clear the session on error. */
                                                                relay_cleanup_poll_connection(&events, pollfd);
@@ -1970,7 +2221,8 @@ restart:
                                                continue;
                                        }
 
-                                       ret = relay_process_data(relay_connection, streams_ht);
+                                       ret = relay_process_data(relay_connection, streams_ht,
+                                                       indexes_ht);
                                        /* connection closed */
                                        if (ret < 0) {
                                                relay_cleanup_poll_connection(&events, pollfd);
@@ -2014,6 +2266,8 @@ error:
        }
        rcu_read_unlock();
 error_poll_create:
+       lttng_ht_destroy(indexes_ht);
+indexes_ht_error:
        lttng_ht_destroy(streams_ht);
 streams_ht_error:
        lttng_ht_destroy(relay_connections_ht);
@@ -2089,10 +2343,12 @@ int main(int argc, char **argv)
                }
        }
 
-       /* Check if daemon is UID = 0 */
-       is_root = !getuid();
+       /* We need those values for the file/dir creation. */
+       relayd_uid = getuid();
+       relayd_gid = getgid();
 
-       if (!is_root) {
+       /* Check if daemon is UID = 0 */
+       if (relayd_uid == 0) {
                if (control_uri->port < 1024 || data_uri->port < 1024) {
                        ERR("Need to be root to use ports < 1024");
                        ret = -1;
index 2bb5ce7e8adee8a235d9b5b6783b66fe5ecef7fd..920948760264405f16941f0069617513e9d27d6f 100644 (file)
@@ -24,6 +24,7 @@
 #include <unistd.h>
 
 #include <common/common.h>
+#include <common/index/index.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
 
@@ -322,3 +323,35 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
        /* Free stream within a RCU call. */
        consumer_stream_free(stream);
 }
+
+/*
+ * Write index of a specific stream either on the relayd or local disk.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_write_index(struct lttng_consumer_stream *stream,
+               struct lttng_packet_index *index)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+       assert(index);
+
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd) {
+               ret = relayd_send_index(&relayd->control_sock, index,
+                               stream->relayd_stream_id, stream->next_net_seq_num - 1);
+       } else {
+               ret = index_write(stream->index_fd, index,
+                               sizeof(struct lttng_packet_index));
+       }
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       rcu_read_unlock();
+       return ret;
+}
index 3096e1e7801d535084bfb3fd7ef076d0d11237a9..956bb6328692215984f5081102f48ae5f6b3de27 100644 (file)
@@ -68,4 +68,10 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
  */
 void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream);
 
+/*
+ * Write index of a specific stream either on the relayd or local disk.
+ */
+int consumer_stream_write_index(struct lttng_consumer_stream *stream,
+               struct lttng_packet_index *index);
+
 #endif /* LTTNG_CONSUMER_STREAM_H */
index 0661f1264fa0e7fa136a9b4772f302558287ab56..191367cd0cd55c55be523cdf139182d4dbe8feca 100644 (file)
@@ -729,6 +729,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                if (ret < 0) {
                        goto end;
                }
+
                uatomic_inc(&relayd->refcount);
                stream->sent_to_relayd = 1;
        } else {
index 7a3d59c9f09927580829ef825c43e1fc8ee2c8c4..b8db779dc07f8e6426a78049d8cf9512dc318b66 100644 (file)
 
 /* Suffix of an index file. */
 #define DEFAULT_INDEX_FILE_SUFFIX                      ".idx"
+#define DEFAULT_INDEX_DIR                                      "index"
 
 extern size_t default_channel_subbuf_size;
 extern size_t default_metadata_subbuf_size;
index 3d22ca61cc1ad720e7e50270137053c6d5f5a787..89b4fd769eab98a49137fb08087388caec4469e4 100644 (file)
@@ -18,6 +18,7 @@
 
 #define _GNU_SOURCE
 #include <assert.h>
+#include <sys/stat.h>
 
 #include <common/common.h>
 #include <common/defaults.h>
@@ -35,8 +36,25 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid,
 {
        int ret, fd = -1;
        struct lttng_packet_index_file_hdr hdr;
+       char fullpath[PATH_MAX];
 
-       ret = utils_create_stream_file(path_name, stream_name, size, count, uid,
+       ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR,
+                       path_name);
+       if (ret < 0) {
+               PERROR("snprintf index path");
+               goto error;
+       }
+
+       /* Create index directory if necessary. */
+       ret = run_as_mkdir(fullpath, S_IRWXU | S_IRWXG, uid, gid);
+       if (ret < 0) {
+               if (ret != -EEXIST) {
+                       ERR("Index trace directory creation error");
+                       goto error;
+               }
+       }
+
+       ret = utils_create_stream_file(fullpath, stream_name, size, count, uid,
                        gid, DEFAULT_INDEX_FILE_SUFFIX);
        if (ret < 0) {
                goto error;
index eaccce3d0df294815be8289ec6983deea3c7f042..315af2eb86e540900ad08496b6b0cff83fef4804 100644 (file)
@@ -907,18 +907,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 0;
+       int err, write_index = 1;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
        struct lttng_packet_index index;
 
        DBG("In read_subbuffer (infd : %d)", infd);
 
-       /* Indicate that for this stream we have to write the index. */
-       if (stream->index_fd >= 0) {
-               write_index = 1;
-       }
-
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
        if (err != 0) {
@@ -942,11 +937,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
-       if (!stream->metadata_flag && write_index) {
+       if (!stream->metadata_flag) {
                ret = get_index_values(&index, infd);
                if (ret < 0) {
                        goto end;
                }
+       } else {
+               write_index = 0;
        }
 
        switch (stream->chan->output) {
@@ -1028,12 +1025,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
 
        /* Write index if needed. */
-       if (write_index) {
-               err = index_write(stream->index_fd, &index, sizeof(index));
-               if (err < 0) {
-                       ret = -1;
-                       goto end;
-               }
+       if (!write_index) {
+               goto end;
+       }
+
+       err = consumer_stream_write_index(stream, &index);
+       if (err < 0) {
+               goto end;
        }
 
 end:
index 2283865cf0d86be6c905baa287fd5809f17af8f2..bb20a64a0cc5d6cb8ffc04f2439c0c581f71803b 100644 (file)
@@ -26,6 +26,7 @@
 #include <common/common.h>
 #include <common/defaults.h>
 #include <common/sessiond-comm/relayd.h>
+#include <common/index/lttng-index.h>
 
 #include "relayd.h"
 
@@ -671,3 +672,63 @@ int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
 error:
        return ret;
 }
+
+/*
+ * Send index to the relayd.
+ */
+int relayd_send_index(struct lttcomm_relayd_sock *rsock,
+               struct lttng_packet_index *index, uint64_t relay_stream_id,
+               uint64_t net_seq_num)
+{
+       int ret;
+       struct lttcomm_relayd_index msg;
+       struct lttcomm_relayd_generic_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+
+       if (rsock->minor < 4) {
+               DBG("Not sending indexes before protocol 2.4");
+               ret = 0;
+               goto error;
+       }
+
+       DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
+
+       msg.relay_stream_id = htobe64(relay_stream_id);
+       msg.net_seq_num = htobe64(net_seq_num);
+
+       /* The index is already in big endian. */
+       msg.packet_size = index->packet_size;
+       msg.content_size = index->content_size;
+       msg.timestamp_begin = index->timestamp_begin;
+       msg.timestamp_end = index->timestamp_end;
+       msg.events_discarded = index->events_discarded;
+       msg.stream_id = index->stream_id;
+
+       /* Send command */
+       ret = send_command(rsock, RELAYD_SEND_INDEX, &msg, sizeof(msg), 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Receive response */
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd send index replied error %d", reply.ret_code);
+       } else {
+               /* Success */
+               ret = 0;
+       }
+
+error:
+       return ret;
+}
index dd435e905dd1d2be3136c230c2697615369be7b5..a49bab733ec3975a3a66b4372724a10361093380 100644 (file)
@@ -43,5 +43,8 @@ int relayd_quiescent_control(struct lttcomm_relayd_sock *sock,
 int relayd_begin_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id);
 int relayd_end_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id,
                unsigned int *is_data_inflight);
+int relayd_send_index(struct lttcomm_relayd_sock *rsock,
+               struct lttng_packet_index *index, uint64_t relay_stream_id,
+               uint64_t net_seq_num);
 
 #endif /* _RELAYD_H */
index ed094ac9d8d73de7936b7921508457efe202289e..3df68682a60c8fc9038dd71f10567062dc4031d1 100644 (file)
@@ -26,6 +26,7 @@
 
 #include <lttng/lttng.h>
 #include <common/defaults.h>
+#include <common/index/lttng-index.h>
 #include <config.h>
 
 #define RELAYD_VERSION_COMM_MAJOR             VERSION_MAJOR
@@ -149,4 +150,18 @@ struct lttcomm_relayd_quiescent_control {
        uint64_t stream_id;
 } LTTNG_PACKED;
 
+/*
+ * Index data.
+ */
+struct lttcomm_relayd_index {
+       uint64_t relay_stream_id;
+       uint64_t net_seq_num;
+       uint64_t packet_size;
+       uint64_t content_size;
+       uint64_t timestamp_begin;
+       uint64_t timestamp_end;
+       uint64_t events_discarded;
+       uint64_t stream_id;
+} LTTNG_PACKED;
+
 #endif /* _RELAYD_COMM */
index cf6262e6aabe6e1bd9e5b932f943614039b71ea7..39ab69bf92946f5f15962d64b8e2e74215cccf96 100644 (file)
@@ -103,6 +103,9 @@ enum lttcomm_relayd_command {
        RELAYD_QUIESCENT_CONTROL            = 9,
        RELAYD_BEGIN_DATA_PENDING           = 10,
        RELAYD_END_DATA_PENDING             = 11,
+       RELAYD_ADD_INDEX                    = 12,
+       RELAYD_SEND_INDEX                   = 13,
+       RELAYD_CLOSE_INDEX                  = 14,
 };
 
 /*
index f0147af4eee84b83164ea692601cb19908729766..113ae959ea32e75b52fd13e7a5cc7d0d0a1867fa 100644 (file)
@@ -1664,7 +1664,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 0;
+       int err, write_index = 1;
        long ret = 0;
        char dummy;
        struct ustctl_consumer_stream *ustream;
@@ -1680,11 +1680,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        /* Ease our life for what's next. */
        ustream = stream->ustream;
 
-       /* Indicate that for this stream we have to write the index. */
-       if (stream->index_fd >= 0) {
-               write_index = 1;
-       }
-
        /* We can consume the 1 byte written into the wait_fd by UST */
        if (stream->monitor && !stream->hangup_flush_done) {
                ssize_t readlen;
@@ -1743,12 +1738,14 @@ retry:
        }
        assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
 
-       if (!stream->metadata_flag && write_index) {
+       if (!stream->metadata_flag) {
                index.offset = htobe64(stream->out_fd_offset);
                ret = get_index_values(&index, ustream);
                if (ret < 0) {
                        goto end;
                }
+       } else {
+               write_index = 0;
        }
 
        /* Get the full padded subbuffer size */
@@ -1788,12 +1785,14 @@ retry:
        assert(err == 0);
 
        /* Write index if needed. */
-       if (write_index) {
-               err = index_write(stream->index_fd, &index, sizeof(index));
-               if (err < 0) {
-                       ret = -1;
-                       goto end;
-               }
+       if (!write_index) {
+               goto end;
+       }
+
+       assert(!stream->metadata_flag);
+       err = consumer_stream_write_index(stream, &index);
+       if (err < 0) {
+               goto end;
        }
 
 end:
This page took 0.042791 seconds and 4 git commands to generate.