Introduce channel timer lock
[lttng-tools.git] / src / common / consumer-stream.c
... / ...
CommitLineData
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
22#include <inttypes.h>
23#include <sys/mman.h>
24#include <unistd.h>
25
26#include <common/common.h>
27#include <common/relayd/relayd.h>
28#include <common/ust-consumer/ust-consumer.h>
29
30#include "consumer-stream.h"
31
32/*
33 * RCU call to free stream. MUST only be used with call_rcu().
34 */
35static void free_stream_rcu(struct rcu_head *head)
36{
37 struct lttng_ht_node_u64 *node =
38 caa_container_of(head, struct lttng_ht_node_u64, head);
39 struct lttng_consumer_stream *stream =
40 caa_container_of(node, struct lttng_consumer_stream, node);
41
42 pthread_mutex_destroy(&stream->lock);
43 free(stream);
44}
45
46/*
47 * Close stream on the relayd side. This call can destroy a relayd if the
48 * conditions are met.
49 *
50 * A RCU read side lock MUST be acquired if the relayd object was looked up in
51 * a hash table before calling this.
52 */
53void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
54 struct consumer_relayd_sock_pair *relayd)
55{
56 int ret;
57
58 assert(stream);
59 assert(relayd);
60
61 if (stream->sent_to_relayd) {
62 uatomic_dec(&relayd->refcount);
63 assert(uatomic_read(&relayd->refcount) >= 0);
64 }
65
66 /* Closing streams requires to lock the control socket. */
67 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
68 ret = relayd_send_close_stream(&relayd->control_sock,
69 stream->relayd_stream_id,
70 stream->next_net_seq_num - 1);
71 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
72 if (ret < 0) {
73 DBG("Unable to close stream on the relayd. Continuing");
74 /*
75 * Continue here. There is nothing we can do for the relayd.
76 * Chances are that the relayd has closed the socket so we just
77 * continue cleaning up.
78 */
79 }
80
81 /* Both conditions are met, we destroy the relayd. */
82 if (uatomic_read(&relayd->refcount) == 0 &&
83 uatomic_read(&relayd->destroy_flag)) {
84 consumer_destroy_relayd(relayd);
85 }
86 stream->net_seq_idx = (uint64_t) -1ULL;
87 stream->sent_to_relayd = 0;
88}
89
90/*
91 * Close stream's file descriptors and, if needed, close stream also on the
92 * relayd side.
93 *
94 * The consumer data lock MUST be acquired.
95 * The stream lock MUST be acquired.
96 */
97void consumer_stream_close(struct lttng_consumer_stream *stream)
98{
99 int ret;
100 struct consumer_relayd_sock_pair *relayd;
101
102 assert(stream);
103
104 switch (consumer_data.type) {
105 case LTTNG_CONSUMER_KERNEL:
106 if (stream->mmap_base != NULL) {
107 ret = munmap(stream->mmap_base, stream->mmap_len);
108 if (ret != 0) {
109 PERROR("munmap");
110 }
111 }
112
113 if (stream->wait_fd >= 0) {
114 ret = close(stream->wait_fd);
115 if (ret) {
116 PERROR("close");
117 }
118 stream->wait_fd = -1;
119 }
120 break;
121 case LTTNG_CONSUMER32_UST:
122 case LTTNG_CONSUMER64_UST:
123 break;
124 default:
125 ERR("Unknown consumer_data type");
126 assert(0);
127 }
128
129 /* Close output fd. Could be a socket or local file at this point. */
130 if (stream->out_fd >= 0) {
131 ret = close(stream->out_fd);
132 if (ret) {
133 PERROR("close");
134 }
135 stream->out_fd = -1;
136 }
137
138 /* Check and cleanup relayd if needed. */
139 rcu_read_lock();
140 relayd = consumer_find_relayd(stream->net_seq_idx);
141 if (relayd != NULL) {
142 consumer_stream_relayd_close(stream, relayd);
143 }
144 rcu_read_unlock();
145}
146
147/*
148 * Delete the stream from all possible hash tables.
149 *
150 * The consumer data lock MUST be acquired.
151 * The stream lock MUST be acquired.
152 */
153void consumer_stream_delete(struct lttng_consumer_stream *stream,
154 struct lttng_ht *ht)
155{
156 int ret;
157 struct lttng_ht_iter iter;
158
159 assert(stream);
160 /* Should NEVER be called not in monitor mode. */
161 assert(stream->chan->monitor);
162
163 rcu_read_lock();
164
165 if (ht) {
166 iter.iter.node = &stream->node.node;
167 ret = lttng_ht_del(ht, &iter);
168 assert(!ret);
169 }
170
171 /* Delete from stream per channel ID hash table. */
172 iter.iter.node = &stream->node_channel_id.node;
173 /*
174 * The returned value is of no importance. Even if the node is NOT in the
175 * hash table, we continue since we may have been called by a code path
176 * that did not add the stream to a (all) hash table. Same goes for the
177 * next call ht del call.
178 */
179 (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
180
181 /* Delete from the global stream list. */
182 iter.iter.node = &stream->node_session_id.node;
183 /* See the previous ht del on why we ignore the returned value. */
184 (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
185
186 rcu_read_unlock();
187
188 /* Decrement the stream count of the global consumer data. */
189 assert(consumer_data.stream_count > 0);
190 consumer_data.stream_count--;
191}
192
193/*
194 * Free the given stream within a RCU call.
195 */
196void consumer_stream_free(struct lttng_consumer_stream *stream)
197{
198 assert(stream);
199
200 call_rcu(&stream->node.head, free_stream_rcu);
201}
202
203/*
204 * Destroy the stream's buffers of the tracer.
205 */
206void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
207{
208 assert(stream);
209
210 switch (consumer_data.type) {
211 case LTTNG_CONSUMER_KERNEL:
212 break;
213 case LTTNG_CONSUMER32_UST:
214 case LTTNG_CONSUMER64_UST:
215 lttng_ustconsumer_del_stream(stream);
216 break;
217 default:
218 ERR("Unknown consumer_data type");
219 assert(0);
220 }
221}
222
223/*
224 * Destroy and close a already created stream.
225 */
226static void destroy_close_stream(struct lttng_consumer_stream *stream)
227{
228 assert(stream);
229
230 DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
231
232 /* Destroy tracer buffers of the stream. */
233 consumer_stream_destroy_buffers(stream);
234 /* Close down everything including the relayd if one. */
235 consumer_stream_close(stream);
236}
237
238/*
239 * Decrement the stream's channel refcount and if down to 0, return the channel
240 * pointer so it can be destroyed by the caller or NULL if not.
241 */
242static struct lttng_consumer_channel *unref_channel(
243 struct lttng_consumer_stream *stream)
244{
245 struct lttng_consumer_channel *free_chan = NULL;
246
247 assert(stream);
248 assert(stream->chan);
249
250 /* Update refcount of channel and see if we need to destroy it. */
251 if (!uatomic_sub_return(&stream->chan->refcount, 1)
252 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
253 free_chan = stream->chan;
254 }
255
256 return free_chan;
257}
258
259/*
260 * Destroy a stream completely. This will delete, close and free the stream.
261 * Once return, the stream is NO longer usable. Its channel may get destroyed
262 * if conditions are met for a monitored stream.
263 *
264 * This MUST be called WITHOUT the consumer data and stream lock acquired if
265 * the stream is in _monitor_ mode else it does not matter.
266 */
267void consumer_stream_destroy(struct lttng_consumer_stream *stream,
268 struct lttng_ht *ht)
269{
270 assert(stream);
271
272 /* Stream is in monitor mode. */
273 if (stream->monitor) {
274 struct lttng_consumer_channel *free_chan = NULL;
275
276 /*
277 * This means that the stream was successfully removed from the streams
278 * list of the channel and sent to the right thread managing this
279 * stream thus being globally visible.
280 */
281 if (stream->globally_visible) {
282 pthread_mutex_lock(&consumer_data.lock);
283 pthread_mutex_lock(&stream->chan->lock);
284 pthread_mutex_lock(&stream->chan->timer_lock);
285 pthread_mutex_lock(&stream->lock);
286 /* Remove every reference of the stream in the consumer. */
287 consumer_stream_delete(stream, ht);
288
289 destroy_close_stream(stream);
290
291 /* Update channel's refcount of the stream. */
292 free_chan = unref_channel(stream);
293
294 /* Indicates that the consumer data state MUST be updated after this. */
295 consumer_data.need_update = 1;
296
297 pthread_mutex_unlock(&stream->lock);
298 pthread_mutex_unlock(&stream->chan->timer_lock);
299 pthread_mutex_unlock(&stream->chan->lock);
300 pthread_mutex_unlock(&consumer_data.lock);
301 } else {
302 /*
303 * If the stream is not visible globally, this needs to be done
304 * outside of the consumer data lock section.
305 */
306 free_chan = unref_channel(stream);
307 }
308
309 if (free_chan) {
310 consumer_del_channel(free_chan);
311 }
312 } else {
313 destroy_close_stream(stream);
314 }
315
316 /* Free stream within a RCU call. */
317 consumer_stream_free(stream);
318}
This page took 0.023483 seconds and 4 git commands to generate.