Add consumer-stream.c/.h in libconsumer
authorDavid Goulet <dgoulet@efficios.com>
Mon, 27 May 2013 19:17:19 +0000 (15:17 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Thu, 27 Jun 2013 15:29:08 +0000 (11:29 -0400)
These new files contain relevant calls to a lttng_consumer_stream object
and for now breaks down consumer_del_stream() from consumer.c that got
out of hand in terms of size and can not be used if the stream was not
previously successfully sent to a thread making difficult the deletion
of a stream in some error path or in no monitor mode (future mode for
streams with snapshots) outside of this use case.

It helps also modularize the consumer object and makes the code easier
to maintain and to better evolve.

At this commit, no components use this yet. Future work will
incrementaly move every consumer stream specific actions to that
seperate API.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/Makefile.am
src/common/consumer-stream.c [new file with mode: 0644]
src/common/consumer-stream.h [new file with mode: 0644]
src/common/consumer.c
src/common/consumer.h

index 6ba6c2b59b6c42fb0269680da7bc61323f625783..aedbc22bd87b507c4e8762729b4ff91d9fcd2bf4 100644 (file)
@@ -21,7 +21,7 @@ libcommon_la_LIBADD = -luuid
 noinst_LTLIBRARIES += libconsumer.la
 
 libconsumer_la_SOURCES = consumer.c consumer.h consumer-metadata-cache.c \
-                         consumer-timer.c
+                         consumer-timer.c consumer-stream.c consumer-stream.h
 
 libconsumer_la_LIBADD = \
                $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \
diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c
new file mode 100644 (file)
index 0000000..24f1b8a
--- /dev/null
@@ -0,0 +1,238 @@
+/*
+ * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
+ *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#include <assert.h>
+#include <sys/mman.h>
+#include <unistd.h>
+
+#include <common/common.h>
+#include <common/relayd/relayd.h>
+#include <common/ust-consumer/ust-consumer.h>
+
+#include "consumer-stream.h"
+
+/*
+ * RCU call to free stream. MUST only be used with call_rcu().
+ */
+static void free_stream_rcu(struct rcu_head *head)
+{
+       struct lttng_ht_node_u64 *node =
+               caa_container_of(head, struct lttng_ht_node_u64, head);
+       struct lttng_consumer_stream *stream =
+               caa_container_of(node, struct lttng_consumer_stream, node);
+
+       pthread_mutex_destroy(&stream->lock);
+       free(stream);
+}
+
+/*
+ * Close stream on the relayd side. This call can destroy a relayd if the
+ * conditions are met.
+ *
+ * A RCU read side lock MUST be acquired if the relayd object was looked up in
+ * a hash table before calling this.
+ */
+void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
+               struct consumer_relayd_sock_pair *relayd)
+{
+       int ret;
+
+       assert(stream);
+       assert(relayd);
+
+       uatomic_dec(&relayd->refcount);
+       assert(uatomic_read(&relayd->refcount) >= 0);
+
+       /* Closing streams requires to lock the control socket. */
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_send_close_stream(&relayd->control_sock,
+                       stream->relayd_stream_id,
+                       stream->next_net_seq_num - 1);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret < 0) {
+               DBG("Unable to close stream on the relayd. Continuing");
+               /*
+                * Continue here. There is nothing we can do for the relayd.
+                * Chances are that the relayd has closed the socket so we just
+                * continue cleaning up.
+                */
+       }
+
+       /* Both conditions are met, we destroy the relayd. */
+       if (uatomic_read(&relayd->refcount) == 0 &&
+                       uatomic_read(&relayd->destroy_flag)) {
+               consumer_destroy_relayd(relayd);
+       }
+}
+
+/*
+ * Close stream's file descriptors and, if needed, close stream also on the
+ * relayd side.
+ *
+ * The consumer data lock MUST be acquired.
+ * The stream lock MUST be acquired.
+ */
+void consumer_stream_close(struct lttng_consumer_stream *stream)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               if (stream->mmap_base != NULL) {
+                       ret = munmap(stream->mmap_base, stream->mmap_len);
+                       if (ret != 0) {
+                               PERROR("munmap");
+                       }
+               }
+
+               if (stream->wait_fd >= 0) {
+                       ret = close(stream->wait_fd);
+                       if (ret) {
+                               PERROR("close");
+                       }
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_del_stream(stream);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+       }
+
+       /* Close output fd. Could be a socket or local file at this point. */
+       if (stream->out_fd >= 0) {
+               ret = close(stream->out_fd);
+               if (ret) {
+                       PERROR("close");
+               }
+       }
+
+       /* Check and cleanup relayd if needed. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd != NULL) {
+               consumer_stream_relayd_close(stream, relayd);
+       }
+       rcu_read_unlock();
+}
+
+/*
+ * Delete the stream from all possible hash tables.
+ *
+ * The consumer data lock MUST be acquired.
+ * The stream lock MUST be acquired.
+ */
+void consumer_stream_delete(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+
+       assert(stream);
+
+       rcu_read_lock();
+
+       if (ht) {
+               iter.iter.node = &stream->node.node;
+               ret = lttng_ht_del(ht, &iter);
+               assert(!ret);
+       }
+
+       /* Delete from stream per channel ID hash table. */
+       iter.iter.node = &stream->node_channel_id.node;
+       /*
+        * The returned value is of no importance. Even if the node is NOT in the
+        * hash table, we continue since we may have been called by a code path
+        * that did not add the stream to a (all) hash table. Same goes for the
+        * next call ht del call.
+        */
+       (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
+
+       /* Delete from the global stream list. */
+       iter.iter.node = &stream->node_session_id.node;
+       /* See the previous ht del on why we ignore the returned value. */
+       (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
+
+       rcu_read_unlock();
+
+       /* Decrement the stream count of the global consumer data. */
+       assert(consumer_data.stream_count > 0);
+       consumer_data.stream_count--;
+}
+
+/*
+ * Free the given stream within a RCU call.
+ */
+void consumer_stream_free(struct lttng_consumer_stream *stream)
+{
+       assert(stream);
+
+       call_rcu(&stream->node.head, free_stream_rcu);
+}
+
+/*
+ * Destroy a stream completely. This will delete, close and free the stream.
+ * Once return, the stream is NO longer usable. Its channel may get destroyed
+ * if conditions are met.
+ *
+ * This MUST be called WITHOUT the consumer data and stream lock acquired.
+ */
+void consumer_stream_destroy(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht)
+{
+       struct lttng_consumer_channel *free_chan = NULL;
+
+       assert(stream);
+
+       DBG("Consumer stream destroy - wait_fd: %d", stream->wait_fd);
+
+       pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
+
+       /* Remove every reference of the stream in the consumer. */
+       consumer_stream_delete(stream, ht);
+
+       /* Close down everything including the relayd if one. */
+       consumer_stream_close(stream);
+
+       /* Update refcount of channel and see if we need to destroy it. */
+       if (!uatomic_sub_return(&stream->chan->refcount, 1)
+                       && !uatomic_read(&stream->chan->nb_init_stream_left)) {
+               free_chan = stream->chan;
+       }
+
+       /* Indicates that the consumer data state MUST be updated after this. */
+       consumer_data.need_update = 1;
+
+       pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&consumer_data.lock);
+
+       if (free_chan) {
+               consumer_del_channel(free_chan);
+       }
+
+       /* Free stream within a RCU call. */
+       consumer_stream_free(stream);
+}
diff --git a/src/common/consumer-stream.h b/src/common/consumer-stream.h
new file mode 100644 (file)
index 0000000..3036b2c
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef LTTNG_CONSUMER_STREAM_H
+#define LTTNG_CONSUMER_STREAM_H
+
+#include "consumer.h"
+
+/*
+ * Close stream's file descriptors and, if needed, close stream also on the
+ * relayd side.
+ *
+ * The stream lock MUST be acquired.
+ * The consumer data lock MUST be acquired.
+ */
+void consumer_stream_close(struct lttng_consumer_stream *stream);
+
+/*
+ * Close stream on the relayd side. This call can destroy a relayd if the
+ * conditions are met.
+ *
+ * A RCU read side lock MUST be acquired if the relayd object was looked up in
+ * a hash table before calling this.
+ */
+void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
+               struct consumer_relayd_sock_pair *relayd);
+
+/*
+ * Delete the stream from all possible hash tables.
+ *
+ * The consumer data lock MUST be acquired.
+ */
+void consumer_stream_delete(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht);
+
+/*
+ * Free the given stream within a RCU call.
+ */
+void consumer_stream_free(struct lttng_consumer_stream *stream);
+
+/*
+ * Destroy a stream completely. This will delete, close and free the stream.
+ * Once return, the stream is NO longer usable. Its channel may get destroyed
+ * if conditions are met.
+ *
+ * This MUST be called WITHOUT the consumer data and stream lock acquired.
+ */
+void consumer_stream_destroy(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht);
+
+#endif /* LTTNG_CONSUMER_STREAM_H */
index 540c59f40d2f58d18d53586ed3eaf14a39e8300e..d46f19342a74aadd4a99d8f88221a3913fcd58ce 100644 (file)
@@ -254,10 +254,8 @@ static void free_relayd_rcu(struct rcu_head *head)
 
 /*
  * Destroy and free relayd socket pair object.
- *
- * This function MUST be called with the consumer_data lock acquired.
  */
-static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret;
        struct lttng_ht_iter iter;
@@ -337,7 +335,7 @@ static void cleanup_relayd_ht(void)
 
        cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
                        node.node) {
-               destroy_relayd(relayd);
+               consumer_destroy_relayd(relayd);
        }
 
        rcu_read_unlock();
@@ -404,7 +402,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * Delete the relayd from the relayd hash table, close the sockets and free
         * the object in a RCU call.
         */
-       destroy_relayd(relayd);
+       consumer_destroy_relayd(relayd);
 
        /* Set inactive endpoint to all streams */
        update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
@@ -436,7 +434,7 @@ void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
 
        /* Destroy the relayd if refcount is 0 */
        if (uatomic_read(&relayd->refcount) == 0) {
-               destroy_relayd(relayd);
+               consumer_destroy_relayd(relayd);
        }
 }
 
@@ -539,7 +537,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
                /* Both conditions are met, we destroy the relayd. */
                if (uatomic_read(&relayd->refcount) == 0 &&
                                uatomic_read(&relayd->destroy_flag)) {
-                       destroy_relayd(relayd);
+                       consumer_destroy_relayd(relayd);
                }
        }
        rcu_read_unlock();
@@ -1957,7 +1955,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                /* Both conditions are met, we destroy the relayd. */
                if (uatomic_read(&relayd->refcount) == 0 &&
                                uatomic_read(&relayd->destroy_flag)) {
-                       destroy_relayd(relayd);
+                       consumer_destroy_relayd(relayd);
                }
        }
        rcu_read_unlock();
index 650397adaf6a6f815b14a9c50da5690565ef6c0e..b4c9aeafd135002c189a1c27e50a5c29a9cc53b2 100644 (file)
@@ -85,6 +85,8 @@ enum consumer_channel_type {
        CONSUMER_CHANNEL_TYPE_DATA      = 1,
 };
 
+extern struct lttng_consumer_global_data consumer_data;
+
 struct stream_list {
        struct cds_list_head head;
        unsigned int count;
@@ -531,5 +533,6 @@ int consumer_send_status_channel(int sock,
                struct lttng_consumer_channel *channel);
 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
                uint64_t key);
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
 
 #endif /* LIB_CONSUMER_H */
This page took 0.031892 seconds and 4 git commands to generate.