Add channel wakeup fd to monitor close
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 6 Mar 2013 21:11:44 +0000 (16:11 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 6 Mar 2013 21:11:44 +0000 (16:11 -0500)
Add channel wakeup fd, so consumer can keep its handle on the stream
wakeup_fd (for periodic timer flush), and yet still discover that an
application has closed a channel or exited.

Requires to be updated in locked-step with lttng-tools
"Add channel wakeup fd to monitor close"

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
include/lttng/ust-abi.h
include/lttng/ust-ctl.h
include/ust-comm.h
liblttng-ust-comm/lttng-ust-comm.c
liblttng-ust-ctl/ustctl.c
liblttng-ust/lttng-ust-abi.c
liblttng-ust/lttng-ust-comm.c
libringbuffer/frontend.h
libringbuffer/ring_buffer_frontend.c
libringbuffer/shm.c
libringbuffer/shm.h

index 8f2233a53fce24c335c2c4066fe9df1391514624..df61cde57c5c0005bb6e72c7d2ccdcbea4d19c2d 100644 (file)
@@ -191,6 +191,7 @@ struct lttng_ust_object_data {
                struct {
                        void *data;
                        enum lttng_ust_chan_type type;
+                       int wakeup_fd;
                } channel;
                struct {
                        int shm_fd;
@@ -279,6 +280,7 @@ struct lttng_ust_obj;
 union ust_args {
        struct {
                void *chan_data;
+               int wakeup_fd;
        } channel;
        struct {
                int shm_fd;
index 3171b3151dd6a3122762f5b316567021672e896f..ed2b513d05b4abd3bc6afceb95567a7e1f7913e7 100644 (file)
@@ -146,6 +146,10 @@ void ustctl_destroy_channel(struct ustctl_consumer_channel *chan);
 
 int ustctl_send_channel_to_sessiond(int sock,
                struct ustctl_consumer_channel *channel);
+int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan);
+int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan);
+int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *consumer_chan);
+int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *consumer_chan);
 
 int ustctl_write_metadata_to_channel(
                struct ustctl_consumer_channel *channel,
@@ -160,6 +164,8 @@ int ustctl_send_stream_to_sessiond(int sock,
                struct ustctl_consumer_stream *stream);
 int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream);
 int ustctl_stream_close_wakeup_fd(struct ustctl_consumer_stream *stream);
+int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream);
+int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream);
 
 /* Create/destroy stream buffers for read */
 struct ustctl_consumer_stream *
@@ -167,9 +173,6 @@ struct ustctl_consumer_stream *
                        int cpu);
 void ustctl_destroy_stream(struct ustctl_consumer_stream *stream);
 
-int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream);
-int ustctl_get_wakeup_fd(struct ustctl_consumer_stream *stream);
-
 /* For mmap mode, readable without "get" operation */
 int ustctl_get_mmap_len(struct ustctl_consumer_stream *stream,
                unsigned long *len);
index c3564dc66a6baa1692bdd9130e6a04877a825cfd..ba800900e1011fc8121723f08b8b2e3a0972fae6 100644 (file)
@@ -185,7 +185,7 @@ extern int ustcomm_send_app_cmd(int sock,
 int ustcomm_recv_fd(int sock);
 
 ssize_t ustcomm_recv_channel_from_sessiond(int sock,
-               void **chan_data, uint64_t len);
+               void **chan_data, uint64_t len, int *wakeup_fd);
 int ustcomm_recv_stream_from_sessiond(int sock,
                uint64_t *memory_map_size,
                int *shm_fd, int *wakeup_fd);
index a666ab274b5f3a25af9f77f5cea779938d2da986..00b1d43af0a4ec1610e4d1a83d204a2c064236b3 100644 (file)
@@ -546,10 +546,12 @@ int ustcomm_send_app_cmd(int sock,
  * expected var_len.
  */
 ssize_t ustcomm_recv_channel_from_sessiond(int sock,
-               void **_chan_data, uint64_t var_len)
+               void **_chan_data, uint64_t var_len,
+               int *_wakeup_fd)
 {
        void *chan_data;
-       ssize_t len;
+       ssize_t len, nr_fd;
+       int wakeup_fd;
 
        if (var_len > LTTNG_UST_CHANNEL_DATA_MAX_LEN) {
                len = -EINVAL;
@@ -565,6 +567,18 @@ ssize_t ustcomm_recv_channel_from_sessiond(int sock,
        if (len != var_len) {
                goto error_recv;
        }
+       /* recv wakeup fd */
+       nr_fd = ustcomm_recv_fds_unix_sock(sock, &wakeup_fd, 1);
+       if (nr_fd <= 0) {
+               if (nr_fd < 0) {
+                       len = nr_fd;
+                       goto error_recv;
+               } else {
+                       len = -EIO;
+                       goto error_recv;
+               }
+       }
+       *_wakeup_fd = wakeup_fd;
        *_chan_data = chan_data;
        return len;
 
index fb4330e4da8b139645348cbaf1d5ae6a445ab910..641c508b8c979f20b651f255d8664bf2dafff3d2 100644 (file)
@@ -46,6 +46,8 @@ struct ustctl_consumer_channel {
 
        /* initial attributes */
        struct ustctl_consumer_channel_attr attr;
+       int wait_fd;                            /* monitor close() */
+       int wakeup_fd;                          /* monitor close() */
 };
 
 /*
@@ -95,6 +97,13 @@ int ustctl_release_object(int sock, struct lttng_ust_object_data *data)
 
        switch (data->type) {
        case LTTNG_UST_OBJECT_TYPE_CHANNEL:
+               if (data->u.channel.wakeup_fd >= 0) {
+                       ret = close(data->u.channel.wakeup_fd);
+                       if (ret < 0) {
+                               ret = -errno;
+                               return ret;
+                       }
+               }
                free(data->u.channel.data);
                break;
        case LTTNG_UST_OBJECT_TYPE_STREAM:
@@ -469,6 +478,7 @@ int ustctl_send_channel(int sock,
                enum lttng_ust_chan_type type,
                void *data,
                uint64_t size,
+               int wakeup_fd,
                int send_fd_only)
 {
        ssize_t len;
@@ -502,6 +512,14 @@ int ustctl_send_channel(int sock,
                        return -EIO;
        }
 
+       /* Send wakeup fd */
+       len = ustcomm_send_fds_unix_sock(sock, &wakeup_fd, 1);
+       if (len <= 0) {
+               if (len < 0)
+                       return len;
+               else
+                       return -EIO;
+       }
        return 0;
 }
 
@@ -569,6 +587,7 @@ int ustctl_recv_channel_from_consumer(int sock,
 {
        struct lttng_ust_object_data *channel_data;
        ssize_t len;
+       int wakeup_fd;
        int ret;
 
        channel_data = zmalloc(sizeof(*channel_data));
@@ -615,7 +634,18 @@ int ustctl_recv_channel_from_consumer(int sock,
                        ret = -EINVAL;
                goto error_recv_data;
        }
-
+       /* recv wakeup fd */
+       len = ustcomm_recv_fds_unix_sock(sock, &wakeup_fd, 1);
+       if (len <= 0) {
+               if (len < 0) {
+                       ret = len;
+                       goto error_recv_data;
+               } else {
+                       ret = -EIO;
+                       goto error_recv_data;
+               }
+       }
+       channel_data->u.channel.wakeup_fd = wakeup_fd;
        *_channel_data = channel_data;
        return 0;
 
@@ -715,6 +745,7 @@ int ustctl_send_channel_to_ust(int sock, int session_handle,
                        channel_data->u.channel.type,
                        channel_data->u.channel.data,
                        channel_data->size,
+                       channel_data->u.channel.wakeup_fd,
                        1);
        if (ret)
                return ret;
@@ -833,6 +864,7 @@ int ustctl_send_channel_to_sessiond(int sock,
                        channel->attr.type,
                        table->objects[0].memory_map,
                        table->objects[0].memory_map_size,
+                       channel->wakeup_fd,
                        0);
 }
 
@@ -893,12 +925,30 @@ end:
        return ret;
 }
 
+int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan)
+{
+       struct channel *chan;
+
+       chan = consumer_chan->chan->chan;
+       return ring_buffer_channel_close_wait_fd(&chan->backend.config,
+                       chan, chan->handle);
+}
+
+int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan)
+{
+       struct channel *chan;
+
+       chan = consumer_chan->chan->chan;
+       return ring_buffer_channel_close_wakeup_fd(&chan->backend.config,
+                       chan, chan->handle);
+}
+
 int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream)
 {
        struct channel *chan;
 
        chan = stream->chan->chan->chan;
-       return ring_buffer_close_wait_fd(&chan->backend.config,
+       return ring_buffer_stream_close_wait_fd(&chan->backend.config,
                        chan, stream->handle, stream->cpu);
 }
 
@@ -907,7 +957,7 @@ int ustctl_stream_close_wakeup_fd(struct ustctl_consumer_stream *stream)
        struct channel *chan;
 
        chan = stream->chan->chan->chan;
-       return ring_buffer_close_wakeup_fd(&chan->backend.config,
+       return ring_buffer_stream_close_wakeup_fd(&chan->backend.config,
                        chan, stream->handle, stream->cpu);
 }
 
@@ -968,7 +1018,23 @@ void ustctl_destroy_stream(struct ustctl_consumer_stream *stream)
        free(stream);
 }
 
-int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream)
+int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *chan)
+{
+       if (!chan)
+               return -EINVAL;
+       return shm_get_wait_fd(chan->chan->handle,
+               &chan->chan->handle->chan._ref);
+}
+
+int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *chan)
+{
+       if (!chan)
+               return -EINVAL;
+       return shm_get_wakeup_fd(chan->chan->handle,
+               &chan->chan->handle->chan._ref);
+}
+
+int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream)
 {
        struct lttng_ust_lib_ring_buffer *buf;
        struct ustctl_consumer_channel *consumer_chan;
@@ -980,7 +1046,7 @@ int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream)
        return shm_get_wait_fd(consumer_chan->chan->handle, &buf->self._ref);
 }
 
-int ustctl_get_wakeup_fd(struct ustctl_consumer_stream *stream)
+int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream)
 {
        struct lttng_ust_lib_ring_buffer *buf;
        struct ustctl_consumer_channel *consumer_chan;
index 70ec22aa1ac8362ad244ed1c92caa3d032bcb244..f26ab5c42907f6a71aab27620e7ecbea10465e07 100644 (file)
@@ -403,11 +403,13 @@ int lttng_abi_map_channel(int session_objd,
        struct channel *chan;
        struct lttng_ust_lib_ring_buffer_config *config;
        void *chan_data;
+       int wakeup_fd;
        uint64_t len;
        int ret;
        enum lttng_ust_chan_type type;
 
        chan_data = uargs->channel.chan_data;
+       wakeup_fd = uargs->channel.wakeup_fd;
        len = ust_chan->len;
        type = ust_chan->type;
 
@@ -415,7 +417,8 @@ int lttng_abi_map_channel(int session_objd,
        case LTTNG_UST_CHAN_PER_CPU:
                break;
        default:
-               return -EINVAL;
+               ret = -EINVAL;
+               goto invalid;
        }
 
        if (session->been_active) {
@@ -423,7 +426,7 @@ int lttng_abi_map_channel(int session_objd,
                goto active;    /* Refuse to add channel to active session */
        }
 
-       channel_handle = channel_handle_create(chan_data, len);
+       channel_handle = channel_handle_create(chan_data, len, wakeup_fd);
        if (!channel_handle) {
                ret = -EINVAL;
                goto handle_error;
@@ -496,13 +499,30 @@ int lttng_abi_map_channel(int session_objd,
        objd_ref(session_objd);
        return chan_objd;
 
+       /* error path after channel was created */
 objd_error:
 notransport:
        free(lttng_chan);
 alloc_error:
        channel_destroy(chan, channel_handle, 0);
+       return ret;
+
+       /*
+        * error path before channel creation (owning chan_data and
+        * wakeup_fd).
+        */
 handle_error:
 active:
+invalid:
+       {
+               int close_ret;
+
+               close_ret = close(wakeup_fd);
+               if (close_ret) {
+                       PERROR("close");
+               }
+       }
+       free(chan_data);
        return ret;
 }
 
index 096a22fca27caea8297dc6484f5e354aa9ae8017..d91f4bd255a993fc1f8699a3deed4f1cf8a6dba9 100644 (file)
@@ -467,9 +467,11 @@ int handle_message(struct sock_info *sock_info,
        case LTTNG_UST_CHANNEL:
        {
                void *chan_data;
+               int wakeup_fd;
 
                len = ustcomm_recv_channel_from_sessiond(sock,
-                               &chan_data, lum->u.channel.len);
+                               &chan_data, lum->u.channel.len,
+                               &wakeup_fd);
                switch (len) {
                case 0: /* orderly shutdown */
                        ret = 0;
@@ -494,6 +496,7 @@ int handle_message(struct sock_info *sock_info,
                        }
                }
                args.channel.chan_data = chan_data;
+               args.channel.wakeup_fd = wakeup_fd;
                if (ops->cmd)
                        ret = ops->cmd(lum->handle, lum->cmd,
                                        (unsigned long) &lum->u,
index 2eda6e945b218c8b557886084202feb4b5151716..89613d4cc64823a943ad3f42d17be83e0140d3c9 100644 (file)
@@ -94,12 +94,20 @@ extern struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
                                int *wakeup_fd,
                                uint64_t *memory_map_size);
 extern
-int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+                       struct channel *chan,
+                       struct lttng_ust_shm_handle *handle);
+extern
+int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+                       struct channel *chan,
+                       struct lttng_ust_shm_handle *handle);
+extern
+int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
                struct channel *chan,
                struct lttng_ust_shm_handle *handle,
                int cpu);
 extern
-int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
                struct channel *chan,
                struct lttng_ust_shm_handle *handle,
                int cpu);
index a01ebbbf8ae8931a937dac9c4a7ec19ba29ea4c8..5d1bc4adbcdf37007269b8b6e3f49db0b1c9688f 100644 (file)
@@ -753,7 +753,8 @@ error_table_alloc:
 }
 
 struct lttng_ust_shm_handle *channel_handle_create(void *data,
-                                       uint64_t memory_map_size)
+                                       uint64_t memory_map_size,
+                                       int wakeup_fd)
 {
        struct lttng_ust_shm_handle *handle;
        struct shm_object *object;
@@ -768,7 +769,7 @@ struct lttng_ust_shm_handle *channel_handle_create(void *data,
                goto error_table_alloc;
        /* Add channel object */
        object = shm_object_table_append_mem(handle->table, data,
-                       memory_map_size);
+                       memory_map_size, wakeup_fd);
        if (!object)
                goto error_table_object;
        /* struct channel is at object 0, offset 0 (hardcoded) */
@@ -868,7 +869,27 @@ struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
        return shmp(handle, chan->backend.buf[cpu].shmp);
 }
 
-int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+                       struct channel *chan,
+                       struct lttng_ust_shm_handle *handle)
+{
+       struct shm_ref *ref;
+
+       ref = &handle->chan._ref;
+       return shm_close_wait_fd(handle, ref);
+}
+
+int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+                       struct channel *chan,
+                       struct lttng_ust_shm_handle *handle)
+{
+       struct shm_ref *ref;
+
+       ref = &handle->chan._ref;
+       return shm_close_wakeup_fd(handle, ref);
+}
+
+int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
                        struct channel *chan,
                        struct lttng_ust_shm_handle *handle,
                        int cpu)
@@ -885,7 +906,7 @@ int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *con
        return shm_close_wait_fd(handle, ref);
 }
 
-int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
                        struct channel *chan,
                        struct lttng_ust_shm_handle *handle,
                        int cpu)
index 85b1e4b7e6ce9b1c4ef1fc61e04a17dfb554c85a..90160ce1976c7c7c5a2853eca93229c9b3871460 100644 (file)
@@ -231,6 +231,7 @@ struct shm_object *_shm_object_table_alloc_mem(struct shm_object_table *table,
 {
        struct shm_object *obj;
        void *memory_map;
+       int waitfd[2], i, ret;
 
        if (table->allocated_len >= table->size)
                return NULL;
@@ -240,8 +241,28 @@ struct shm_object *_shm_object_table_alloc_mem(struct shm_object_table *table,
        if (!memory_map)
                goto alloc_error;
 
-       obj->wait_fd[0] = -1;
-       obj->wait_fd[1] = -1;
+       /* wait_fd: create pipe */
+       ret = pipe(waitfd);
+       if (ret < 0) {
+               PERROR("pipe");
+               goto error_pipe;
+       }
+       for (i = 0; i < 2; i++) {
+               ret = fcntl(waitfd[i], F_SETFD, FD_CLOEXEC);
+               if (ret < 0) {
+                       PERROR("fcntl");
+                       goto error_fcntl;
+               }
+       }
+       /* The write end of the pipe needs to be non-blocking */
+       ret = fcntl(waitfd[1], F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               PERROR("fcntl");
+               goto error_fcntl;
+       }
+       memcpy(obj->wait_fd, waitfd, sizeof(waitfd));
+
+       /* no shm_fd */
        obj->shm_fd = -1;
 
        obj->type = SHM_OBJECT_MEM;
@@ -252,6 +273,16 @@ struct shm_object *_shm_object_table_alloc_mem(struct shm_object_table *table,
 
        return obj;
 
+error_fcntl:
+       for (i = 0; i < 2; i++) {
+               ret = close(waitfd[i]);
+               if (ret) {
+                       PERROR("close");
+                       assert(0);
+               }
+       }
+error_pipe:
+       free(memory_map);
 alloc_error:
        return NULL;
 }
@@ -328,18 +359,31 @@ error_mmap:
  * Passing ownership of mem to object.
  */
 struct shm_object *shm_object_table_append_mem(struct shm_object_table *table,
-                       void *mem, size_t memory_map_size)
+                       void *mem, size_t memory_map_size, int wakeup_fd)
 {
        struct shm_object *obj;
+       int ret;
 
        if (table->allocated_len >= table->size)
                return NULL;
        obj = &table->objects[table->allocated_len];
 
-       obj->wait_fd[0] = -1;
-       obj->wait_fd[1] = -1;
+       obj->wait_fd[0] = -1;   /* read end is unset */
+       obj->wait_fd[1] = wakeup_fd;
        obj->shm_fd = -1;
 
+       ret = fcntl(obj->wait_fd[1], F_SETFD, FD_CLOEXEC);
+       if (ret < 0) {
+               PERROR("fcntl");
+               goto error_fcntl;
+       }
+       /* The write end of the pipe needs to be non-blocking */
+       ret = fcntl(obj->wait_fd[1], F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               PERROR("fcntl");
+               goto error_fcntl;
+       }
+
        obj->type = SHM_OBJECT_MEM;
        obj->memory_map = mem;
        obj->memory_map_size = memory_map_size;
@@ -347,6 +391,9 @@ struct shm_object *shm_object_table_append_mem(struct shm_object_table *table,
        obj->index = table->allocated_len++;
 
        return obj;
+
+error_fcntl:
+       return NULL;
 }
 
 static
@@ -379,8 +426,21 @@ void shmp_object_destroy(struct shm_object *obj)
                break;
        }
        case SHM_OBJECT_MEM:
+       {
+               int ret, i;
+
+               for (i = 0; i < 2; i++) {
+                       if (obj->wait_fd[i] < 0)
+                               continue;
+                       ret = close(obj->wait_fd[i]);
+                       if (ret) {
+                               PERROR("close");
+                               assert(0);
+                       }
+               }
                free(obj->memory_map);
                break;
+       }
        default:
                assert(0);
        }
index a1ddc4cdf2b06dc27f1d1bb55ecea7af339b89ad..8d9d1136469d580c6cec5f1c37af639ce73ef6e0 100644 (file)
@@ -29,7 +29,7 @@
 /* channel_handle_create - for UST. */
 extern
 struct lttng_ust_shm_handle *channel_handle_create(void *data,
-                               uint64_t memory_map_size);
+                               uint64_t memory_map_size, int wakeup_fd);
 /* channel_handle_add_stream - for UST. */
 extern
 int channel_handle_add_stream(struct lttng_ust_shm_handle *handle,
@@ -94,7 +94,7 @@ struct shm_object *shm_object_table_append_shm(struct shm_object_table *table,
                        size_t memory_map_size);
 /* mem ownership is passed to shm_object_table_append_mem(). */
 struct shm_object *shm_object_table_append_mem(struct shm_object_table *table,
-                       void *mem, size_t memory_map_size);
+                       void *mem, size_t memory_map_size, int wakeup_fd);
 void shm_object_table_destroy(struct shm_object_table *table);
 
 /*
This page took 0.033466 seconds and 4 git commands to generate.