2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 #include <common/common.h>
26 #include <common/defaults.h>
27 #include <common/compat/string.h>
30 #include "health-sessiond.h"
31 #include "kernel-consumer.h"
32 #include "notification-thread-commands.h"
34 #include "lttng-sessiond.h"
36 static char *create_channel_path(struct consumer_output
*consumer
,
40 char tmp_path
[PATH_MAX
];
41 char *pathname
= NULL
;
45 /* Get the right path name destination */
46 if (consumer
->type
== CONSUMER_DST_LOCAL
) {
47 /* Set application path to the destination path */
48 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s%s%s",
49 consumer
->dst
.session_root_path
,
53 PERROR("snprintf kernel channel path");
56 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
58 PERROR("lttng_strndup");
62 /* Create directory */
63 ret
= run_as_mkdir_recursive(pathname
, S_IRWXU
| S_IRWXG
, uid
, gid
);
65 if (errno
!= EEXIST
) {
66 ERR("Trace directory creation error");
70 DBG3("Kernel local consumer tracefile path: %s", pathname
);
72 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s%s",
73 consumer
->dst
.net
.base_dir
,
76 PERROR("snprintf kernel metadata path");
79 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
81 PERROR("lttng_strndup");
84 DBG3("Kernel network consumer subdir path: %s", pathname
);
95 * Sending a single channel to the consumer with command ADD_CHANNEL.
97 int kernel_consumer_add_channel(struct consumer_socket
*sock
,
98 struct ltt_kernel_channel
*channel
,
99 struct ltt_kernel_session
*ksession
,
100 unsigned int monitor
)
104 struct lttcomm_consumer_msg lkm
;
105 struct consumer_output
*consumer
;
106 enum lttng_error_code status
;
107 struct ltt_session
*session
;
108 struct lttng_channel_extended
*channel_attr_extended
;
113 assert(ksession
->consumer
);
115 consumer
= ksession
->consumer
;
116 channel_attr_extended
= (struct lttng_channel_extended
*)
117 channel
->channel
->attr
.extended
.ptr
;
119 DBG("Kernel consumer adding channel %s to kernel consumer",
120 channel
->channel
->name
);
123 pathname
= create_channel_path(consumer
, ksession
->uid
,
127 pathname
= strdup("");
134 /* Prep channel message structure */
135 consumer_init_channel_comm_msg(&lkm
,
136 LTTNG_CONSUMER_ADD_CHANNEL
,
142 consumer
->net_seq_index
,
143 channel
->channel
->name
,
144 channel
->stream_count
,
145 channel
->channel
->attr
.output
,
146 CONSUMER_CHANNEL_TYPE_DATA
,
147 channel
->channel
->attr
.tracefile_size
,
148 channel
->channel
->attr
.tracefile_count
,
150 channel
->channel
->attr
.live_timer_interval
,
151 channel_attr_extended
->monitor_timer_interval
);
153 health_code_update();
155 ret
= consumer_send_channel(sock
, &lkm
);
160 health_code_update();
162 session
= session_find_by_id(ksession
->id
);
165 status
= notification_thread_command_add_channel(
166 notification_thread_handle
, session
->name
,
167 ksession
->uid
, ksession
->gid
,
168 channel
->channel
->name
, channel
->key
,
170 channel
->channel
->attr
.subbuf_size
* channel
->channel
->attr
.num_subbuf
);
172 if (status
!= LTTNG_OK
) {
177 channel
->published_to_notification_thread
= true;
185 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
187 * The consumer socket lock must be held by the caller.
189 int kernel_consumer_add_metadata(struct consumer_socket
*sock
,
190 struct ltt_kernel_session
*session
, unsigned int monitor
)
194 struct lttcomm_consumer_msg lkm
;
195 struct consumer_output
*consumer
;
199 assert(session
->consumer
);
202 DBG("Sending metadata %d to kernel consumer", session
->metadata_stream_fd
);
204 /* Get consumer output pointer */
205 consumer
= session
->consumer
;
208 pathname
= create_channel_path(consumer
, session
->uid
, session
->gid
);
211 pathname
= strdup("");
218 /* Prep channel message structure */
219 consumer_init_channel_comm_msg(&lkm
,
220 LTTNG_CONSUMER_ADD_CHANNEL
,
221 session
->metadata
->key
,
226 consumer
->net_seq_index
,
227 DEFAULT_METADATA_NAME
,
229 DEFAULT_KERNEL_CHANNEL_OUTPUT
,
230 CONSUMER_CHANNEL_TYPE_METADATA
,
234 health_code_update();
236 ret
= consumer_send_channel(sock
, &lkm
);
241 health_code_update();
243 /* Prep stream message structure */
244 consumer_init_stream_comm_msg(&lkm
,
245 LTTNG_CONSUMER_ADD_STREAM
,
246 session
->metadata
->key
,
247 session
->metadata_stream_fd
,
248 0); /* CPU: 0 for metadata. */
250 health_code_update();
252 /* Send stream and file descriptor */
253 ret
= consumer_send_stream(sock
, consumer
, &lkm
,
254 &session
->metadata_stream_fd
, 1);
259 health_code_update();
267 * Sending a single stream to the consumer with command ADD_STREAM.
269 int kernel_consumer_add_stream(struct consumer_socket
*sock
,
270 struct ltt_kernel_channel
*channel
, struct ltt_kernel_stream
*stream
,
271 struct ltt_kernel_session
*session
, unsigned int monitor
)
274 struct lttcomm_consumer_msg lkm
;
275 struct consumer_output
*consumer
;
280 assert(session
->consumer
);
283 DBG("Sending stream %d of channel %s to kernel consumer",
284 stream
->fd
, channel
->channel
->name
);
286 /* Get consumer output pointer */
287 consumer
= session
->consumer
;
289 /* Prep stream consumer message */
290 consumer_init_stream_comm_msg(&lkm
,
291 LTTNG_CONSUMER_ADD_STREAM
,
296 health_code_update();
298 /* Send stream and file descriptor */
299 ret
= consumer_send_stream(sock
, consumer
, &lkm
, &stream
->fd
, 1);
304 health_code_update();
311 * Sending the notification that all streams were sent with STREAMS_SENT.
313 int kernel_consumer_streams_sent(struct consumer_socket
*sock
,
314 struct ltt_kernel_session
*session
, uint64_t channel_key
)
317 struct lttcomm_consumer_msg lkm
;
318 struct consumer_output
*consumer
;
323 DBG("Sending streams_sent");
324 /* Get consumer output pointer */
325 consumer
= session
->consumer
;
327 /* Prep stream consumer message */
328 consumer_init_streams_sent_comm_msg(&lkm
,
329 LTTNG_CONSUMER_STREAMS_SENT
,
330 channel_key
, consumer
->net_seq_index
);
332 health_code_update();
334 /* Send stream and file descriptor */
335 ret
= consumer_send_msg(sock
, &lkm
);
345 * Send all stream fds of kernel channel to the consumer.
347 * The consumer socket lock must be held by the caller.
349 int kernel_consumer_send_channel_stream(struct consumer_socket
*sock
,
350 struct ltt_kernel_channel
*channel
, struct ltt_kernel_session
*session
,
351 unsigned int monitor
)
354 struct ltt_kernel_stream
*stream
;
359 assert(session
->consumer
);
362 /* Bail out if consumer is disabled */
363 if (!session
->consumer
->enabled
) {
368 DBG("Sending streams of channel %s to kernel consumer",
369 channel
->channel
->name
);
371 if (!channel
->sent_to_consumer
) {
372 ret
= kernel_consumer_add_channel(sock
, channel
, session
, monitor
);
376 channel
->sent_to_consumer
= true;
380 cds_list_for_each_entry(stream
, &channel
->stream_list
.head
, list
) {
381 if (!stream
->fd
|| stream
->sent_to_consumer
) {
385 /* Add stream on the kernel consumer side. */
386 ret
= kernel_consumer_add_stream(sock
, channel
, stream
, session
,
391 stream
->sent_to_consumer
= true;
399 * Send all stream fds of the kernel session to the consumer.
401 * The consumer socket lock must be held by the caller.
403 int kernel_consumer_send_session(struct consumer_socket
*sock
,
404 struct ltt_kernel_session
*session
)
406 int ret
, monitor
= 0;
407 struct ltt_kernel_channel
*chan
;
411 assert(session
->consumer
);
414 /* Bail out if consumer is disabled */
415 if (!session
->consumer
->enabled
) {
420 /* Don't monitor the streams on the consumer if in flight recorder. */
421 if (session
->output_traces
) {
425 DBG("Sending session stream to kernel consumer");
427 if (session
->metadata_stream_fd
>= 0 && session
->metadata
) {
428 ret
= kernel_consumer_add_metadata(sock
, session
, monitor
);
434 /* Send channel and streams of it */
435 cds_list_for_each_entry(chan
, &session
->channel_list
.head
, list
) {
436 ret
= kernel_consumer_send_channel_stream(sock
, chan
, session
,
443 * Inform the relay that all the streams for the
446 ret
= kernel_consumer_streams_sent(sock
, session
, chan
->key
);
453 DBG("Kernel consumer FDs of metadata and channel streams sent");
455 session
->consumer_fds_sent
= 1;
462 int kernel_consumer_destroy_channel(struct consumer_socket
*socket
,
463 struct ltt_kernel_channel
*channel
)
466 struct lttcomm_consumer_msg msg
;
471 DBG("Sending kernel consumer destroy channel key %" PRIu64
, channel
->key
);
473 memset(&msg
, 0, sizeof(msg
));
474 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
475 msg
.u
.destroy_channel
.key
= channel
->key
;
477 pthread_mutex_lock(socket
->lock
);
478 health_code_update();
480 ret
= consumer_send_msg(socket
, &msg
);
486 health_code_update();
487 pthread_mutex_unlock(socket
->lock
);
491 int kernel_consumer_destroy_metadata(struct consumer_socket
*socket
,
492 struct ltt_kernel_metadata
*metadata
)
495 struct lttcomm_consumer_msg msg
;
500 DBG("Sending kernel consumer destroy channel key %" PRIu64
, metadata
->key
);
502 memset(&msg
, 0, sizeof(msg
));
503 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
504 msg
.u
.destroy_channel
.key
= metadata
->key
;
506 pthread_mutex_lock(socket
->lock
);
507 health_code_update();
509 ret
= consumer_send_msg(socket
, &msg
);
515 health_code_update();
516 pthread_mutex_unlock(socket
->lock
);