Use shm handle, fix allocation space, take care of alignment
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
index 2b4ecc24b1c44769afe455f40773b6c6db5233c8..01d0f5e87007c269f15026366a35fc1ae9827e85 100644 (file)
@@ -39,7 +39,9 @@
  */
 
 #include <sys/types.h>
-#include <sys/shm.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <fcntl.h>
 #include <urcu/compiler.h>
 #include <urcu/ref.h>
 
 #include "frontend.h"
 #include "shm.h"
 
+#ifndef max
+#define max(a, b)      ((a) > (b) ? (a) : (b))
+#endif
+
 /*
  * Internal structure representing offsets to use at a sub-buffer switch.
  */
@@ -73,8 +79,8 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf)
        struct channel *chan = shmp(buf->backend.chan);
 
        lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
-       free(shmp(buf->commit_hot));
-       free(shmp(buf->commit_cold));
+       /* buf->commit_hot will be freed by shm teardown */
+       /* buf->commit_cold will be freed by shm teardown */
 
        lib_ring_buffer_backend_free(&buf->backend);
 }
@@ -163,6 +169,9 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
        if (ret)
                return ret;
 
+       align_shm(shm_header,
+               max(__alignof__(struct commit_counters_hot),
+                   __alignof__(struct commit_counters_cold)));
        set_shmp(&buf->commit_hot,
                 zalloc_shm(shm_header,
                        sizeof(*buf->commit_hot) * chan->backend.num_subbuf));
@@ -171,6 +180,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
                goto free_chanbuf;
        }
 
+       align_shm(shm_header, __alignof__(struct commit_counters_cold));
        set_shmp(&buf->commit_cold,
                 zalloc_shm(shm_header,
                        sizeof(*buf->commit_cold) * chan->backend.num_subbuf));
@@ -358,10 +368,24 @@ static void channel_unregister_notifiers(struct channel *chan)
        channel_backend_unregister_notifiers(&chan->backend);
 }
 
-static void channel_free(struct channel *chan)
+static void channel_free(struct shm_handle *handle)
 {
+       struct shm_header *header = handle->header;
+       struct channel *chan = shmp(header->chan);
+       int ret;
+
        channel_backend_free(&chan->backend);
-       free(chan);
+       /* chan is freed by shm teardown */
+       ret = munmap(header, header->shm_size);
+       if (ret) {
+               PERROR("umnmap");
+               assert(0);
+       }
+       ret = close(handle->shmfd);
+       if (ret) {
+               PERROR("close");
+               assert(0);
+       }
 }
 
 /**
@@ -378,72 +402,96 @@ static void channel_free(struct channel *chan)
  *                         padding to let readers get those sub-buffers.
  *                         Used for live streaming.
  * @read_timer_interval: Time interval (in us) to wake up pending readers.
- * @shmid: shared memory ID (output)
+ * @shmfd: shared memory file descriptor (output, needs to be closed by
+ *         the caller)
  *
  * Holds cpu hotplug.
  * Returns NULL on failure.
  */
-struct channel *channel_create(const struct lib_ring_buffer_config *config,
+struct shm_handle *channel_create(const struct lib_ring_buffer_config *config,
                   const char *name, void *priv, void *buf_addr,
                   size_t subbuf_size,
                   size_t num_subbuf, unsigned int switch_timer_interval,
-                  unsigned int read_timer_interval,
-                  int *shmid)
+                  unsigned int read_timer_interval)
 {
-       int ret, cpu;
+       int ret, cpu, shmfd;
        struct channel *chan;
-       size_t shmsize, bufshmsize;
+       size_t shmsize, bufshmsize, bufshmalign;
        struct shm_header *shm_header;
        unsigned long num_subbuf_alloc;
+       struct shm_handle *handle;
 
        if (lib_ring_buffer_check_config(config, switch_timer_interval,
                                         read_timer_interval))
                return NULL;
 
+       handle = zmalloc(sizeof(struct shm_handle));
+       if (!handle)
+               return NULL;
+
        /* Calculate the shm allocation layout */
        shmsize = sizeof(struct shm_header);
+       shmsize += offset_align(shmsize, __alignof__(struct channel));
        shmsize += sizeof(struct channel);
 
        /* Per-cpu buffer size: control (prior to backend) */
+       shmsize += offset_align(shmsize, __alignof__(struct lib_ring_buffer));
        bufshmsize = sizeof(struct lib_ring_buffer);
        shmsize += bufshmsize * num_possible_cpus();
 
        /* Per-cpu buffer size: backend */
+       shmsize += offset_align(shmsize, PAGE_SIZE);
        /* num_subbuf + 1 is the worse case */
        num_subbuf_alloc = num_subbuf + 1;
        bufshmsize = sizeof(struct lib_ring_buffer_backend_pages *) * num_subbuf_alloc;
-       bufshmsize += subbuf_size * (num_subbuf_alloc);
-       bufshmsize += (sizeof(struct lib_ring_buffer_backend_pages) + subbuf_size) * num_subbuf_alloc;
+       bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
+       bufshmsize += subbuf_size * num_subbuf_alloc;
+       bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_pages));
+       bufshmsize += sizeof(struct lib_ring_buffer_backend_pages) * num_subbuf_alloc;
+       bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_subbuffer));
        bufshmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf;
+       bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
        shmsize += bufshmsize * num_possible_cpus();
 
        /* Per-cpu buffer size: control (after backend) */
-       bufshmsize += sizeof(struct commit_counters_hot) * num_subbuf;
+       shmsize += offset_align(shmsize,
+                       max(__alignof__(struct commit_counters_hot),
+                           __alignof__(struct commit_counters_cold)));
+       bufshmsize = sizeof(struct commit_counters_hot) * num_subbuf;
+       bufshmsize += offset_align(bufshmsize, __alignof__(struct commit_counters_cold));
        bufshmsize += sizeof(struct commit_counters_cold) * num_subbuf;
+       shmsize += bufshmsize * num_possible_cpus();
 
-       /* Allocate shm */
-       *shmid = shmget(getpid(), shmsize, IPC_CREAT | IPC_EXCL | 0700);
-       if (*shmid < 0) {
-               if (errno == EINVAL)
-                       ERR("shmget() returned EINVAL; maybe /proc/sys/kernel/shmmax should be increased.");
-               else
-                       PERROR("shmget");
-               return NULL;
+       /*
+        * Allocate shm, and immediately unlink its shm oject, keeping
+        * only the file descriptor as a reference to the object. If it
+        * already exists (caused by short race window during which the
+        * global object exists in a concurrent shm_open), simply retry.
+        */
+       do {
+               shmfd = shm_open("/ust-shm-tmp",
+                                 O_CREAT | O_EXCL | O_RDWR, 0700);
+       } while (shmfd < 0 && errno == EEXIST);
+       if (shmfd < 0) {
+               PERROR("shm_open");
+               goto error_shm_open;
        }
-
-       shm_header = shmat(*shmid, NULL, 0);
-       if (shm_header == (void *) -1) {
-               perror("shmat");
-               goto destroy_shmem;
+       ret = shm_unlink("/ust-shm-tmp");
+       if (ret) {
+               PERROR("shm_unlink");
+               goto error_unlink;
+       }
+       ret = ftruncate(shmfd, shmsize);
+       if (ret) {
+               PERROR("ftruncate");
+               goto error_ftruncate;
        }
 
-       /* Already mark the shared memory for destruction. This will occur only
-         * when all users have detached.
-        */
-       ret = shmctl(*shmid, IPC_RMID, NULL);
-       if (ret == -1) {
-               perror("shmctl");
-               goto destroy_shmem;
+       shm_header = mmap(NULL, shmsize, PROT_READ | PROT_WRITE,
+                         MAP_SHARED, shmfd, 0);
+       if (shm_header == MAP_FAILED) {
+               PERROR("mmap");
+               goto error_mmap;
        }
 
        shm_header->magic = SHM_MAGIC;
@@ -453,6 +501,7 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
        shm_header->shm_size = shmsize;
        shm_header->shm_allocated = sizeof(struct shm_header);
 
+       align_shm(shm_header, __alignof__(struct channel));
        chan = zalloc_shm(shm_header, sizeof(struct channel));
        if (!chan)
                goto destroy_shmem;
@@ -467,7 +516,6 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
        //TODO
        //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
        //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
-       urcu_ref_init(&chan->ref);
        //TODO
        //init_waitqueue_head(&chan->read_wait);
        //init_waitqueue_head(&chan->hp_wait);
@@ -490,21 +538,33 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config,
                lib_ring_buffer_start_read_timer(buf);
        }
 
-       return chan;
+       handle->header = shm_header;
+       handle->shmfd = shmfd;
+       return handle;
 
 destroy_shmem:
-       ret = shmctl(*shmid, IPC_RMID, NULL);
-       if (ret == -1) {
-               perror("shmctl");
+       ret = munmap(shm_header, shmsize);
+       if (ret) {
+               PERROR("umnmap");
+               assert(0);
        }
+error_mmap:
+error_ftruncate:
+error_unlink:
+       ret = close(shmfd);
+       if (ret) {
+               PERROR("close");
+               assert(0);
+       }
+error_shm_open:
+       free(handle);
        return NULL;
 }
 
 static
-void channel_release(struct urcu_ref *ref)
+void channel_release(struct shm_handle *handle)
 {
-       struct channel *chan = caa_container_of(ref, struct channel, ref);
-       channel_free(chan);
+       channel_free(handle);
 }
 
 /**
@@ -512,17 +572,19 @@ void channel_release(struct urcu_ref *ref)
  * @chan: channel to destroy
  *
  * Holds cpu hotplug.
- * Call "destroy" callback, finalize channels, wait for readers to release their
- * reference, then destroy ring buffer data. Note that when readers have
- * completed data consumption of finalized channels, get_subbuf() will return
- * -ENODATA. They should release their handle at that point.
- * Returns the private data pointer.
+ * Call "destroy" callback, finalize channels, decrement the channel
+ * reference count. Note that when readers have completed data
+ * consumption of finalized channels, get_subbuf() will return -ENODATA.
+ * They should release their handle at that point.  Returns the private
+ * data pointer.
  */
-void *channel_destroy(struct channel *chan)
+void *channel_destroy(struct shm_handle *handle)
 {
-       int cpu;
+       struct shm_header *header = handle->header;
+       struct channel *chan = shmp(header->chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
        void *priv;
+       int cpu;
 
        channel_unregister_notifiers(chan);
 
@@ -560,7 +622,11 @@ void *channel_destroy(struct channel *chan)
        CMM_ACCESS_ONCE(chan->finalized) = 1;
        //wake_up_interruptible(&chan->hp_wait);
        //wake_up_interruptible(&chan->read_wait);
-       urcu_ref_put(&chan->ref, channel_release);
+       /*
+        * sessiond/consumer are keeping a reference on the shm file
+        * descriptor directly. No need to refcount.
+        */
+       channel_release(handle);
        priv = chan->backend.priv;
        return priv;
 }
@@ -581,7 +647,6 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
 
        if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
                return -EBUSY;
-       urcu_ref_get(&chan->ref);
        cmm_smp_mb();
        return 0;
 }
@@ -593,7 +658,6 @@ void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
        CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
        cmm_smp_mb();
        uatomic_dec(&buf->active_readers);
-       urcu_ref_put(&chan->ref, channel_release);
 }
 
 /**
This page took 0.027626 seconds and 4 git commands to generate.