consumerd: tag metadata channel as being part of a live session
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23 #include <inttypes.h>
24
25 #include <common/common.h>
26 #include <common/defaults.h>
27 #include <common/compat/string.h>
28
29 #include "consumer.h"
30 #include "health-sessiond.h"
31 #include "kernel-consumer.h"
32 #include "notification-thread-commands.h"
33 #include "session.h"
34 #include "lttng-sessiond.h"
35
36 static char *create_channel_path(struct consumer_output *consumer)
37 {
38 int ret;
39 char tmp_path[PATH_MAX];
40 char *pathname = NULL;
41
42 assert(consumer);
43
44 /* Get the right path name destination */
45 if (consumer->type == CONSUMER_DST_LOCAL ||
46 (consumer->type == CONSUMER_DST_NET &&
47 consumer->relay_major_version == 2 &&
48 consumer->relay_minor_version >= 11)) {
49 pathname = strdup(consumer->domain_subdir);
50 if (!pathname) {
51 PERROR("Failed to copy domain subdirectory string %s",
52 consumer->domain_subdir);
53 goto error;
54 }
55 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
56 pathname);
57 } else {
58 /* Network output. */
59 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
60 consumer->dst.net.base_dir,
61 consumer->domain_subdir);
62 if (ret < 0) {
63 PERROR("snprintf kernel metadata path");
64 goto error;
65 } else if (ret >= sizeof(tmp_path)) {
66 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
67 sizeof(tmp_path), ret,
68 consumer->dst.net.base_dir,
69 consumer->domain_subdir);
70 goto error;
71 }
72 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
73 if (!pathname) {
74 PERROR("lttng_strndup");
75 goto error;
76 }
77 DBG3("Kernel network consumer subdir path: %s", pathname);
78 }
79
80 return pathname;
81
82 error:
83 free(pathname);
84 return NULL;
85 }
86
87 /*
88 * Sending a single channel to the consumer with command ADD_CHANNEL.
89 */
90 static
91 int kernel_consumer_add_channel(struct consumer_socket *sock,
92 struct ltt_kernel_channel *channel,
93 struct ltt_kernel_session *ksession,
94 unsigned int monitor)
95 {
96 int ret;
97 char *pathname = NULL;
98 struct lttcomm_consumer_msg lkm;
99 struct consumer_output *consumer;
100 enum lttng_error_code status;
101 struct ltt_session *session = NULL;
102 struct lttng_channel_extended *channel_attr_extended;
103 bool is_local_trace;
104
105 /* Safety net */
106 assert(channel);
107 assert(ksession);
108 assert(ksession->consumer);
109
110 consumer = ksession->consumer;
111 channel_attr_extended = (struct lttng_channel_extended *)
112 channel->channel->attr.extended.ptr;
113
114 DBG("Kernel consumer adding channel %s to kernel consumer",
115 channel->channel->name);
116 is_local_trace = consumer->net_seq_index == -1ULL;
117
118 pathname = create_channel_path(consumer);
119 if (!pathname) {
120 ret = -1;
121 goto error;
122 }
123
124 if (is_local_trace && ksession->current_trace_chunk) {
125 enum lttng_trace_chunk_status chunk_status;
126 char *pathname_index;
127
128 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
129 pathname);
130 if (ret < 0) {
131 ERR("Failed to format channel index directory");
132 ret = -1;
133 goto error;
134 }
135
136 /*
137 * Create the index subdirectory which will take care
138 * of implicitly creating the channel's path.
139 */
140 chunk_status = lttng_trace_chunk_create_subdirectory(
141 ksession->current_trace_chunk, pathname_index);
142 free(pathname_index);
143 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
144 ret = -1;
145 goto error;
146 }
147 }
148
149 /* Prep channel message structure */
150 consumer_init_add_channel_comm_msg(&lkm,
151 channel->key,
152 ksession->id,
153 pathname,
154 ksession->uid,
155 ksession->gid,
156 consumer->net_seq_index,
157 channel->channel->name,
158 channel->stream_count,
159 channel->channel->attr.output,
160 CONSUMER_CHANNEL_TYPE_DATA,
161 channel->channel->attr.tracefile_size,
162 channel->channel->attr.tracefile_count,
163 monitor,
164 channel->channel->attr.live_timer_interval,
165 ksession->is_live_session,
166 channel_attr_extended->monitor_timer_interval,
167 ksession->current_trace_chunk);
168
169 health_code_update();
170
171 ret = consumer_send_channel(sock, &lkm);
172 if (ret < 0) {
173 goto error;
174 }
175
176 health_code_update();
177 rcu_read_lock();
178 session = session_find_by_id(ksession->id);
179 assert(session);
180 assert(pthread_mutex_trylock(&session->lock));
181 assert(session_trylock_list());
182
183 status = notification_thread_command_add_channel(
184 notification_thread_handle, session->name,
185 ksession->uid, ksession->gid,
186 channel->channel->name, channel->key,
187 LTTNG_DOMAIN_KERNEL,
188 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
189 rcu_read_unlock();
190 if (status != LTTNG_OK) {
191 ret = -1;
192 goto error;
193 }
194
195 channel->published_to_notification_thread = true;
196
197 error:
198 if (session) {
199 session_put(session);
200 }
201 free(pathname);
202 return ret;
203 }
204
205 /*
206 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
207 *
208 * The consumer socket lock must be held by the caller.
209 */
210 int kernel_consumer_add_metadata(struct consumer_socket *sock,
211 struct ltt_kernel_session *ksession, unsigned int monitor)
212 {
213 int ret;
214 struct lttcomm_consumer_msg lkm;
215 struct consumer_output *consumer;
216
217 rcu_read_lock();
218
219 /* Safety net */
220 assert(ksession);
221 assert(ksession->consumer);
222 assert(sock);
223
224 DBG("Sending metadata %d to kernel consumer",
225 ksession->metadata_stream_fd);
226
227 /* Get consumer output pointer */
228 consumer = ksession->consumer;
229
230 /* Prep channel message structure */
231 consumer_init_add_channel_comm_msg(&lkm, ksession->metadata->key,
232 ksession->id, "", ksession->uid, ksession->gid,
233 consumer->net_seq_index, DEFAULT_METADATA_NAME, 1,
234 DEFAULT_KERNEL_CHANNEL_OUTPUT,
235 CONSUMER_CHANNEL_TYPE_METADATA, 0, 0, monitor, 0,
236 ksession->is_live_session, 0,
237 ksession->current_trace_chunk);
238
239 health_code_update();
240
241 ret = consumer_send_channel(sock, &lkm);
242 if (ret < 0) {
243 goto error;
244 }
245
246 health_code_update();
247
248 /* Prep stream message structure */
249 consumer_init_add_stream_comm_msg(&lkm,
250 ksession->metadata->key,
251 ksession->metadata_stream_fd,
252 0 /* CPU: 0 for metadata. */);
253
254 health_code_update();
255
256 /* Send stream and file descriptor */
257 ret = consumer_send_stream(sock, consumer, &lkm,
258 &ksession->metadata_stream_fd, 1);
259 if (ret < 0) {
260 goto error;
261 }
262
263 health_code_update();
264
265 error:
266 rcu_read_unlock();
267 return ret;
268 }
269
270 /*
271 * Sending a single stream to the consumer with command ADD_STREAM.
272 */
273 static
274 int kernel_consumer_add_stream(struct consumer_socket *sock,
275 struct ltt_kernel_channel *channel,
276 struct ltt_kernel_stream *stream,
277 struct ltt_kernel_session *session, unsigned int monitor)
278 {
279 int ret;
280 struct lttcomm_consumer_msg lkm;
281 struct consumer_output *consumer;
282
283 assert(channel);
284 assert(stream);
285 assert(session);
286 assert(session->consumer);
287 assert(sock);
288
289 DBG("Sending stream %d of channel %s to kernel consumer",
290 stream->fd, channel->channel->name);
291
292 /* Get consumer output pointer */
293 consumer = session->consumer;
294
295 /* Prep stream consumer message */
296 consumer_init_add_stream_comm_msg(&lkm,
297 channel->key,
298 stream->fd,
299 stream->cpu);
300
301 health_code_update();
302
303 /* Send stream and file descriptor */
304 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
305 if (ret < 0) {
306 goto error;
307 }
308
309 health_code_update();
310
311 error:
312 return ret;
313 }
314
315 /*
316 * Sending the notification that all streams were sent with STREAMS_SENT.
317 */
318 int kernel_consumer_streams_sent(struct consumer_socket *sock,
319 struct ltt_kernel_session *session, uint64_t channel_key)
320 {
321 int ret;
322 struct lttcomm_consumer_msg lkm;
323 struct consumer_output *consumer;
324
325 assert(sock);
326 assert(session);
327
328 DBG("Sending streams_sent");
329 /* Get consumer output pointer */
330 consumer = session->consumer;
331
332 /* Prep stream consumer message */
333 consumer_init_streams_sent_comm_msg(&lkm,
334 LTTNG_CONSUMER_STREAMS_SENT,
335 channel_key, consumer->net_seq_index);
336
337 health_code_update();
338
339 /* Send stream and file descriptor */
340 ret = consumer_send_msg(sock, &lkm);
341 if (ret < 0) {
342 goto error;
343 }
344
345 error:
346 return ret;
347 }
348
349 /*
350 * Send all stream fds of kernel channel to the consumer.
351 *
352 * The consumer socket lock must be held by the caller.
353 */
354 int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
355 struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
356 unsigned int monitor)
357 {
358 int ret = LTTNG_OK;
359 struct ltt_kernel_stream *stream;
360
361 /* Safety net */
362 assert(channel);
363 assert(ksession);
364 assert(ksession->consumer);
365 assert(sock);
366
367 rcu_read_lock();
368
369 /* Bail out if consumer is disabled */
370 if (!ksession->consumer->enabled) {
371 ret = LTTNG_OK;
372 goto error;
373 }
374
375 DBG("Sending streams of channel %s to kernel consumer",
376 channel->channel->name);
377
378 if (!channel->sent_to_consumer) {
379 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
380 if (ret < 0) {
381 goto error;
382 }
383 channel->sent_to_consumer = true;
384 }
385
386 /* Send streams */
387 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
388 if (!stream->fd || stream->sent_to_consumer) {
389 continue;
390 }
391
392 /* Add stream on the kernel consumer side. */
393 ret = kernel_consumer_add_stream(sock, channel, stream,
394 ksession, monitor);
395 if (ret < 0) {
396 goto error;
397 }
398 stream->sent_to_consumer = true;
399 }
400
401 error:
402 rcu_read_unlock();
403 return ret;
404 }
405
406 /*
407 * Send all stream fds of the kernel session to the consumer.
408 *
409 * The consumer socket lock must be held by the caller.
410 */
411 int kernel_consumer_send_session(struct consumer_socket *sock,
412 struct ltt_kernel_session *session)
413 {
414 int ret, monitor = 0;
415 struct ltt_kernel_channel *chan;
416
417 /* Safety net */
418 assert(session);
419 assert(session->consumer);
420 assert(sock);
421
422 /* Bail out if consumer is disabled */
423 if (!session->consumer->enabled) {
424 ret = LTTNG_OK;
425 goto error;
426 }
427
428 /* Don't monitor the streams on the consumer if in flight recorder. */
429 if (session->output_traces) {
430 monitor = 1;
431 }
432
433 DBG("Sending session stream to kernel consumer");
434
435 if (session->metadata_stream_fd >= 0 && session->metadata) {
436 ret = kernel_consumer_add_metadata(sock, session, monitor);
437 if (ret < 0) {
438 goto error;
439 }
440 }
441
442 /* Send channel and streams of it */
443 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
444 ret = kernel_consumer_send_channel_streams(sock, chan, session,
445 monitor);
446 if (ret < 0) {
447 goto error;
448 }
449 if (monitor) {
450 /*
451 * Inform the relay that all the streams for the
452 * channel were sent.
453 */
454 ret = kernel_consumer_streams_sent(sock, session, chan->key);
455 if (ret < 0) {
456 goto error;
457 }
458 }
459 }
460
461 DBG("Kernel consumer FDs of metadata and channel streams sent");
462
463 session->consumer_fds_sent = 1;
464 return 0;
465
466 error:
467 return ret;
468 }
469
470 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
471 struct ltt_kernel_channel *channel)
472 {
473 int ret;
474 struct lttcomm_consumer_msg msg;
475
476 assert(channel);
477 assert(socket);
478
479 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
480
481 memset(&msg, 0, sizeof(msg));
482 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
483 msg.u.destroy_channel.key = channel->key;
484
485 pthread_mutex_lock(socket->lock);
486 health_code_update();
487
488 ret = consumer_send_msg(socket, &msg);
489 if (ret < 0) {
490 goto error;
491 }
492
493 error:
494 health_code_update();
495 pthread_mutex_unlock(socket->lock);
496 return ret;
497 }
498
499 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
500 struct ltt_kernel_metadata *metadata)
501 {
502 int ret;
503 struct lttcomm_consumer_msg msg;
504
505 assert(metadata);
506 assert(socket);
507
508 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
509
510 memset(&msg, 0, sizeof(msg));
511 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
512 msg.u.destroy_channel.key = metadata->key;
513
514 pthread_mutex_lock(socket->lock);
515 health_code_update();
516
517 ret = consumer_send_msg(socket, &msg);
518 if (ret < 0) {
519 goto error;
520 }
521
522 error:
523 health_code_update();
524 pthread_mutex_unlock(socket->lock);
525 return ret;
526 }
This page took 0.069363 seconds and 4 git commands to generate.