Sessiond rotation thread
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
CommitLineData
f1e16794
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
f1e16794
DG
19#include <stdio.h>
20#include <stdlib.h>
f1e16794
DG
21#include <sys/stat.h>
22#include <unistd.h>
e1f3997a 23#include <inttypes.h>
f1e16794
DG
24
25#include <common/common.h>
26#include <common/defaults.h>
f5436bfc 27#include <common/compat/string.h>
f1e16794 28
00e2e675 29#include "consumer.h"
8782cc74 30#include "health-sessiond.h"
f1e16794 31#include "kernel-consumer.h"
e9404c27
JG
32#include "notification-thread-commands.h"
33#include "session.h"
34#include "lttng-sessiond.h"
f1e16794 35
2bba9e53
DG
36static char *create_channel_path(struct consumer_output *consumer,
37 uid_t uid, gid_t gid)
00e2e675
DG
38{
39 int ret;
ffe60014 40 char tmp_path[PATH_MAX];
2bba9e53 41 char *pathname = NULL;
00e2e675 42
2bba9e53 43 assert(consumer);
00e2e675 44
ffe60014
DG
45 /* Get the right path name destination */
46 if (consumer->type == CONSUMER_DST_LOCAL) {
47 /* Set application path to the destination path */
366a9222
JD
48 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s%s",
49 consumer->dst.session_root_path,
50 consumer->chunk_path,
51 consumer->subdir);
ffe60014 52 if (ret < 0) {
2bba9e53 53 PERROR("snprintf kernel channel path");
ffe60014
DG
54 goto error;
55 }
f5436bfc 56 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 57 if (!pathname) {
f5436bfc 58 PERROR("lttng_strndup");
bb3c4e70
MD
59 goto error;
60 }
ffe60014
DG
61
62 /* Create directory */
2bba9e53 63 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, uid, gid);
ffe60014 64 if (ret < 0) {
df5b86c8 65 if (errno != EEXIST) {
ffe60014
DG
66 ERR("Trace directory creation error");
67 goto error;
68 }
69 }
70 DBG3("Kernel local consumer tracefile path: %s", pathname);
71 } else {
2b29c638
JD
72 ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
73 consumer->dst.net.base_dir,
74 consumer->subdir);
ffe60014 75 if (ret < 0) {
2bba9e53 76 PERROR("snprintf kernel metadata path");
ffe60014
DG
77 goto error;
78 }
f5436bfc 79 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 80 if (!pathname) {
f5436bfc 81 PERROR("lttng_strndup");
bb3c4e70
MD
82 goto error;
83 }
ffe60014
DG
84 DBG3("Kernel network consumer subdir path: %s", pathname);
85 }
86
2bba9e53
DG
87 return pathname;
88
89error:
90 free(pathname);
91 return NULL;
92}
93
94/*
95 * Sending a single channel to the consumer with command ADD_CHANNEL.
96 */
97int kernel_consumer_add_channel(struct consumer_socket *sock,
e9404c27
JG
98 struct ltt_kernel_channel *channel,
99 struct ltt_kernel_session *ksession,
2bba9e53
DG
100 unsigned int monitor)
101{
102 int ret;
103 char *pathname;
104 struct lttcomm_consumer_msg lkm;
105 struct consumer_output *consumer;
e9404c27
JG
106 enum lttng_error_code status;
107 struct ltt_session *session;
108 struct lttng_channel_extended *channel_attr_extended;
2bba9e53
DG
109
110 /* Safety net */
111 assert(channel);
e9404c27
JG
112 assert(ksession);
113 assert(ksession->consumer);
2bba9e53 114
e9404c27
JG
115 consumer = ksession->consumer;
116 channel_attr_extended = (struct lttng_channel_extended *)
117 channel->channel->attr.extended.ptr;
2bba9e53
DG
118
119 DBG("Kernel consumer adding channel %s to kernel consumer",
120 channel->channel->name);
121
122 if (monitor) {
e9404c27
JG
123 pathname = create_channel_path(consumer, ksession->uid,
124 ksession->gid);
2bba9e53
DG
125 } else {
126 /* Empty path. */
53efb85a 127 pathname = strdup("");
2bba9e53 128 }
bb3c4e70
MD
129 if (!pathname) {
130 ret = -1;
131 goto error;
132 }
2bba9e53 133
00e2e675
DG
134 /* Prep channel message structure */
135 consumer_init_channel_comm_msg(&lkm,
136 LTTNG_CONSUMER_ADD_CHANNEL,
e1f3997a 137 channel->key,
e9404c27 138 ksession->id,
ffe60014 139 pathname,
e9404c27
JG
140 ksession->uid,
141 ksession->gid,
ffe60014 142 consumer->net_seq_index,
c30aaa51 143 channel->channel->name,
ffe60014
DG
144 channel->stream_count,
145 channel->channel->attr.output,
1624d5b7
JD
146 CONSUMER_CHANNEL_TYPE_DATA,
147 channel->channel->attr.tracefile_size,
2bba9e53 148 channel->channel->attr.tracefile_count,
ecc48a90 149 monitor,
e9404c27
JG
150 channel->channel->attr.live_timer_interval,
151 channel_attr_extended->monitor_timer_interval);
00e2e675 152
840cb59c 153 health_code_update();
ca03de58 154
00e2e675
DG
155 ret = consumer_send_channel(sock, &lkm);
156 if (ret < 0) {
157 goto error;
158 }
159
840cb59c 160 health_code_update();
e9404c27
JG
161 rcu_read_lock();
162 session = session_find_by_id(ksession->id);
163 assert(session);
ca03de58 164
e9404c27
JG
165 status = notification_thread_command_add_channel(
166 notification_thread_handle, session->name,
167 ksession->uid, ksession->gid,
e1f3997a 168 channel->channel->name, channel->key,
e9404c27
JG
169 LTTNG_DOMAIN_KERNEL,
170 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
171 rcu_read_unlock();
172 if (status != LTTNG_OK) {
173 ret = -1;
174 goto error;
175 }
753873bf
JR
176
177 channel->published_to_notification_thread = true;
178
00e2e675 179error:
53efb85a 180 free(pathname);
00e2e675
DG
181 return ret;
182}
183
184/*
185 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
186 *
187 * The consumer socket lock must be held by the caller.
00e2e675 188 */
f50f23d9 189int kernel_consumer_add_metadata(struct consumer_socket *sock,
2bba9e53 190 struct ltt_kernel_session *session, unsigned int monitor)
00e2e675
DG
191{
192 int ret;
2bba9e53 193 char *pathname;
00e2e675 194 struct lttcomm_consumer_msg lkm;
a7d9a3e7 195 struct consumer_output *consumer;
00e2e675
DG
196
197 /* Safety net */
198 assert(session);
199 assert(session->consumer);
f50f23d9 200 assert(sock);
00e2e675
DG
201
202 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
203
204 /* Get consumer output pointer */
a7d9a3e7 205 consumer = session->consumer;
00e2e675 206
2bba9e53
DG
207 if (monitor) {
208 pathname = create_channel_path(consumer, session->uid, session->gid);
00e2e675 209 } else {
2bba9e53 210 /* Empty path. */
53efb85a 211 pathname = strdup("");
00e2e675 212 }
bb3c4e70
MD
213 if (!pathname) {
214 ret = -1;
215 goto error;
216 }
00e2e675
DG
217
218 /* Prep channel message structure */
219 consumer_init_channel_comm_msg(&lkm,
220 LTTNG_CONSUMER_ADD_CHANNEL,
d40f0359 221 session->metadata->key,
ffe60014
DG
222 session->id,
223 pathname,
224 session->uid,
225 session->gid,
226 consumer->net_seq_index,
30079b6b 227 DEFAULT_METADATA_NAME,
ffe60014
DG
228 1,
229 DEFAULT_KERNEL_CHANNEL_OUTPUT,
1624d5b7 230 CONSUMER_CHANNEL_TYPE_METADATA,
2bba9e53 231 0, 0,
e9404c27 232 monitor, 0, 0);
00e2e675 233
840cb59c 234 health_code_update();
ca03de58 235
00e2e675
DG
236 ret = consumer_send_channel(sock, &lkm);
237 if (ret < 0) {
238 goto error;
239 }
240
840cb59c 241 health_code_update();
ca03de58 242
00e2e675
DG
243 /* Prep stream message structure */
244 consumer_init_stream_comm_msg(&lkm,
245 LTTNG_CONSUMER_ADD_STREAM,
d40f0359 246 session->metadata->key,
00e2e675 247 session->metadata_stream_fd,
1624d5b7 248 0); /* CPU: 0 for metadata. */
00e2e675 249
840cb59c 250 health_code_update();
ca03de58 251
00e2e675 252 /* Send stream and file descriptor */
a7d9a3e7 253 ret = consumer_send_stream(sock, consumer, &lkm,
00e2e675
DG
254 &session->metadata_stream_fd, 1);
255 if (ret < 0) {
256 goto error;
257 }
258
840cb59c 259 health_code_update();
ca03de58 260
00e2e675 261error:
53efb85a 262 free(pathname);
00e2e675
DG
263 return ret;
264}
265
266/*
267 * Sending a single stream to the consumer with command ADD_STREAM.
268 */
f50f23d9
DG
269int kernel_consumer_add_stream(struct consumer_socket *sock,
270 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
2bba9e53 271 struct ltt_kernel_session *session, unsigned int monitor)
00e2e675
DG
272{
273 int ret;
00e2e675 274 struct lttcomm_consumer_msg lkm;
a7d9a3e7 275 struct consumer_output *consumer;
00e2e675
DG
276
277 assert(channel);
278 assert(stream);
279 assert(session);
280 assert(session->consumer);
f50f23d9 281 assert(sock);
00e2e675
DG
282
283 DBG("Sending stream %d of channel %s to kernel consumer",
284 stream->fd, channel->channel->name);
285
286 /* Get consumer output pointer */
a7d9a3e7 287 consumer = session->consumer;
00e2e675 288
00e2e675 289 /* Prep stream consumer message */
ffe60014
DG
290 consumer_init_stream_comm_msg(&lkm,
291 LTTNG_CONSUMER_ADD_STREAM,
e1f3997a 292 channel->key,
00e2e675 293 stream->fd,
ffe60014 294 stream->cpu);
00e2e675 295
840cb59c 296 health_code_update();
ca03de58 297
00e2e675 298 /* Send stream and file descriptor */
a7d9a3e7 299 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
300 if (ret < 0) {
301 goto error;
302 }
303
840cb59c 304 health_code_update();
ca03de58 305
00e2e675
DG
306error:
307 return ret;
308}
309
a4baae1b
JD
310/*
311 * Sending the notification that all streams were sent with STREAMS_SENT.
312 */
313int kernel_consumer_streams_sent(struct consumer_socket *sock,
314 struct ltt_kernel_session *session, uint64_t channel_key)
315{
316 int ret;
317 struct lttcomm_consumer_msg lkm;
318 struct consumer_output *consumer;
319
320 assert(sock);
321 assert(session);
322
323 DBG("Sending streams_sent");
324 /* Get consumer output pointer */
325 consumer = session->consumer;
326
327 /* Prep stream consumer message */
328 consumer_init_streams_sent_comm_msg(&lkm,
329 LTTNG_CONSUMER_STREAMS_SENT,
330 channel_key, consumer->net_seq_index);
331
332 health_code_update();
333
334 /* Send stream and file descriptor */
335 ret = consumer_send_msg(sock, &lkm);
336 if (ret < 0) {
337 goto error;
338 }
339
340error:
341 return ret;
342}
343
f1e16794
DG
344/*
345 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
346 *
347 * The consumer socket lock must be held by the caller.
f1e16794 348 */
f50f23d9 349int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
2bba9e53
DG
350 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
351 unsigned int monitor)
f1e16794 352{
e99f9447 353 int ret = LTTNG_OK;
f1e16794 354 struct ltt_kernel_stream *stream;
00e2e675
DG
355
356 /* Safety net */
357 assert(channel);
358 assert(session);
359 assert(session->consumer);
f50f23d9 360 assert(sock);
00e2e675
DG
361
362 /* Bail out if consumer is disabled */
363 if (!session->consumer->enabled) {
f73fabfd 364 ret = LTTNG_OK;
00e2e675
DG
365 goto error;
366 }
f1e16794
DG
367
368 DBG("Sending streams of channel %s to kernel consumer",
369 channel->channel->name);
370
e99f9447
MD
371 if (!channel->sent_to_consumer) {
372 ret = kernel_consumer_add_channel(sock, channel, session, monitor);
373 if (ret < 0) {
374 goto error;
375 }
376 channel->sent_to_consumer = true;
f1e16794
DG
377 }
378
379 /* Send streams */
380 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
6986ab9b 381 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
382 continue;
383 }
00e2e675
DG
384
385 /* Add stream on the kernel consumer side. */
2bba9e53
DG
386 ret = kernel_consumer_add_stream(sock, channel, stream, session,
387 monitor);
f1e16794 388 if (ret < 0) {
f1e16794
DG
389 goto error;
390 }
6986ab9b 391 stream->sent_to_consumer = true;
f1e16794
DG
392 }
393
f1e16794
DG
394error:
395 return ret;
396}
397
398/*
399 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
400 *
401 * The consumer socket lock must be held by the caller.
f1e16794 402 */
f50f23d9
DG
403int kernel_consumer_send_session(struct consumer_socket *sock,
404 struct ltt_kernel_session *session)
f1e16794 405{
2bba9e53 406 int ret, monitor = 0;
f1e16794 407 struct ltt_kernel_channel *chan;
f1e16794 408
00e2e675
DG
409 /* Safety net */
410 assert(session);
411 assert(session->consumer);
f50f23d9 412 assert(sock);
f1e16794 413
00e2e675
DG
414 /* Bail out if consumer is disabled */
415 if (!session->consumer->enabled) {
f73fabfd 416 ret = LTTNG_OK;
00e2e675 417 goto error;
f1e16794
DG
418 }
419
2bba9e53
DG
420 /* Don't monitor the streams on the consumer if in flight recorder. */
421 if (session->output_traces) {
422 monitor = 1;
423 }
424
00e2e675
DG
425 DBG("Sending session stream to kernel consumer");
426
609af759 427 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 428 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 429 if (ret < 0) {
f1e16794
DG
430 goto error;
431 }
f1e16794
DG
432 }
433
00e2e675 434 /* Send channel and streams of it */
f1e16794 435 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
2bba9e53
DG
436 ret = kernel_consumer_send_channel_stream(sock, chan, session,
437 monitor);
f1e16794
DG
438 if (ret < 0) {
439 goto error;
440 }
601262d6
JD
441 if (monitor) {
442 /*
443 * Inform the relay that all the streams for the
444 * channel were sent.
445 */
e1f3997a 446 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
447 if (ret < 0) {
448 goto error;
449 }
450 }
f1e16794
DG
451 }
452
00e2e675 453 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 454
4ce9ff51 455 session->consumer_fds_sent = 1;
f1e16794
DG
456 return 0;
457
458error:
459 return ret;
460}
07b86b52
JD
461
462int kernel_consumer_destroy_channel(struct consumer_socket *socket,
463 struct ltt_kernel_channel *channel)
464{
465 int ret;
466 struct lttcomm_consumer_msg msg;
467
468 assert(channel);
469 assert(socket);
07b86b52 470
e1f3997a 471 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 472
53efb85a 473 memset(&msg, 0, sizeof(msg));
07b86b52 474 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 475 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
476
477 pthread_mutex_lock(socket->lock);
478 health_code_update();
479
480 ret = consumer_send_msg(socket, &msg);
481 if (ret < 0) {
482 goto error;
483 }
484
485error:
486 health_code_update();
487 pthread_mutex_unlock(socket->lock);
488 return ret;
489}
490
491int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
492 struct ltt_kernel_metadata *metadata)
493{
494 int ret;
495 struct lttcomm_consumer_msg msg;
496
497 assert(metadata);
498 assert(socket);
07b86b52 499
d40f0359 500 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 501
53efb85a 502 memset(&msg, 0, sizeof(msg));
07b86b52 503 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 504 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
505
506 pthread_mutex_lock(socket->lock);
507 health_code_update();
508
509 ret = consumer_send_msg(socket, &msg);
510 if (ret < 0) {
511 goto error;
512 }
513
514error:
515 health_code_update();
516 pthread_mutex_unlock(socket->lock);
517 return ret;
518}
This page took 0.068233 seconds and 4 git commands to generate.