Add liblttng-ctl destructor to cleanup memory
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _GNU_SOURCE
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
24#include <sys/types.h>
25#include <unistd.h>
26
27#include <common/common.h>
28#include <common/defaults.h>
29#include <common/uri.h>
30
31#include "consumer.h"
32
2f77fc4b
DG
33/*
34 * Send destroy relayd command to consumer.
35 *
36 * On success return positive value. On error, negative value.
37 */
38int consumer_send_destroy_relayd(struct consumer_socket *sock,
39 struct consumer_output *consumer)
40{
41 int ret;
42 struct lttcomm_consumer_msg msg;
43
44 assert(consumer);
45 assert(sock);
46
47 DBG2("Sending destroy relayd command to consumer...");
48
49 /* Bail out if consumer is disabled */
50 if (!consumer->enabled) {
f73fabfd 51 ret = LTTNG_OK;
2f77fc4b
DG
52 DBG3("Consumer is disabled");
53 goto error;
54 }
55
56 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
57 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
58
59 pthread_mutex_lock(sock->lock);
60 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
61 pthread_mutex_unlock(sock->lock);
62 if (ret < 0) {
63 PERROR("send consumer destroy relayd command");
64 goto error;
65 }
66
67 DBG2("Consumer send destroy relayd command done");
68
69error:
70 return ret;
71}
72
73/*
74 * For each consumer socket in the consumer output object, send a destroy
75 * relayd command.
76 */
77void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
78{
79 int ret;
80 struct lttng_ht_iter iter;
81 struct consumer_socket *socket;
82
83 assert(consumer);
84
85 /* Destroy any relayd connection */
86 if (consumer && consumer->type == CONSUMER_DST_NET) {
87 rcu_read_lock();
88 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
89 node.node) {
90 /* Send destroy relayd command */
91 ret = consumer_send_destroy_relayd(socket, consumer);
92 if (ret < 0) {
93 ERR("Unable to send destroy relayd command to consumer");
94 /* Continue since we MUST delete everything at this point. */
95 }
96 }
97 rcu_read_unlock();
98 }
99}
100
a4b92340
DG
101/*
102 * From a consumer_data structure, allocate and add a consumer socket to the
103 * consumer output.
104 *
105 * Return 0 on success, else negative value on error
106 */
107int consumer_create_socket(struct consumer_data *data,
108 struct consumer_output *output)
109{
110 int ret = 0;
111 struct consumer_socket *socket;
112
113 assert(data);
114
115 if (output == NULL || data->cmd_sock < 0) {
116 /*
117 * Not an error. Possible there is simply not spawned consumer or it's
118 * disabled for the tracing session asking the socket.
119 */
120 goto error;
121 }
122
123 rcu_read_lock();
124 socket = consumer_find_socket(data->cmd_sock, output);
125 rcu_read_unlock();
126 if (socket == NULL) {
127 socket = consumer_allocate_socket(data->cmd_sock);
128 if (socket == NULL) {
129 ret = -1;
130 goto error;
131 }
132
2f77fc4b 133 socket->registered = 0;
a4b92340
DG
134 socket->lock = &data->lock;
135 rcu_read_lock();
136 consumer_add_socket(socket, output);
137 rcu_read_unlock();
138 }
139
140 DBG3("Consumer socket created (fd: %d) and added to output",
141 data->cmd_sock);
142
143error:
144 return ret;
145}
146
173af62f
DG
147/*
148 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
149 * be acquired before calling this function and across use of the
150 * returned consumer_socket.
151 */
152struct consumer_socket *consumer_find_socket(int key,
153 struct consumer_output *consumer)
154{
155 struct lttng_ht_iter iter;
156 struct lttng_ht_node_ulong *node;
157 struct consumer_socket *socket = NULL;
158
159 /* Negative keys are lookup failures */
a4b92340 160 if (key < 0 || consumer == NULL) {
173af62f
DG
161 return NULL;
162 }
163
164 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
165 &iter);
166 node = lttng_ht_iter_get_node_ulong(&iter);
167 if (node != NULL) {
168 socket = caa_container_of(node, struct consumer_socket, node);
169 }
170
171 return socket;
172}
173
174/*
175 * Allocate a new consumer_socket and return the pointer.
176 */
177struct consumer_socket *consumer_allocate_socket(int fd)
178{
179 struct consumer_socket *socket = NULL;
180
181 socket = zmalloc(sizeof(struct consumer_socket));
182 if (socket == NULL) {
183 PERROR("zmalloc consumer socket");
184 goto error;
185 }
186
187 socket->fd = fd;
188 lttng_ht_node_init_ulong(&socket->node, fd);
189
190error:
191 return socket;
192}
193
194/*
195 * Add consumer socket to consumer output object. Read side lock must be
196 * acquired before calling this function.
197 */
198void consumer_add_socket(struct consumer_socket *sock,
199 struct consumer_output *consumer)
200{
201 assert(sock);
202 assert(consumer);
203
204 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
205}
206
207/*
208 * Delte consumer socket to consumer output object. Read side lock must be
209 * acquired before calling this function.
210 */
211void consumer_del_socket(struct consumer_socket *sock,
212 struct consumer_output *consumer)
213{
214 int ret;
215 struct lttng_ht_iter iter;
216
217 assert(sock);
218 assert(consumer);
219
220 iter.iter.node = &sock->node.node;
221 ret = lttng_ht_del(consumer->socks, &iter);
222 assert(!ret);
223}
224
225/*
226 * RCU destroy call function.
227 */
228static void destroy_socket_rcu(struct rcu_head *head)
229{
230 struct lttng_ht_node_ulong *node =
231 caa_container_of(head, struct lttng_ht_node_ulong, head);
232 struct consumer_socket *socket =
233 caa_container_of(node, struct consumer_socket, node);
234
235 free(socket);
236}
237
238/*
239 * Destroy and free socket pointer in a call RCU. Read side lock must be
240 * acquired before calling this function.
241 */
242void consumer_destroy_socket(struct consumer_socket *sock)
243{
244 assert(sock);
245
246 /*
247 * We DO NOT close the file descriptor here since it is global to the
2f77fc4b
DG
248 * session daemon and is closed only if the consumer dies or a custom
249 * consumer was registered,
173af62f 250 */
2f77fc4b
DG
251 if (sock->registered) {
252 DBG3("Consumer socket was registered. Closing fd %d", sock->fd);
253 lttcomm_close_unix_sock(sock->fd);
254 }
173af62f
DG
255
256 call_rcu(&sock->node.head, destroy_socket_rcu);
257}
258
00e2e675
DG
259/*
260 * Allocate and assign data to a consumer_output object.
261 *
262 * Return pointer to structure.
263 */
264struct consumer_output *consumer_create_output(enum consumer_dst_type type)
265{
266 struct consumer_output *output = NULL;
267
268 output = zmalloc(sizeof(struct consumer_output));
269 if (output == NULL) {
270 PERROR("zmalloc consumer_output");
271 goto error;
272 }
273
274 /* By default, consumer output is enabled */
275 output->enabled = 1;
276 output->type = type;
277 output->net_seq_index = -1;
173af62f
DG
278
279 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675
DG
280
281error:
282 return output;
283}
284
285/*
286 * Delete the consumer_output object from the list and free the ptr.
287 */
288void consumer_destroy_output(struct consumer_output *obj)
289{
290 if (obj == NULL) {
291 return;
292 }
293
173af62f
DG
294 if (obj->socks) {
295 struct lttng_ht_iter iter;
296 struct consumer_socket *socket;
297
2f77fc4b 298 rcu_read_lock();
173af62f 299 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
2f77fc4b 300 consumer_del_socket(socket, obj);
173af62f
DG
301 consumer_destroy_socket(socket);
302 }
2f77fc4b
DG
303 rcu_read_unlock();
304
305 /* Finally destroy HT */
306 lttng_ht_destroy(obj->socks);
00e2e675 307 }
173af62f 308
00e2e675
DG
309 free(obj);
310}
311
312/*
313 * Copy consumer output and returned the newly allocated copy.
314 */
315struct consumer_output *consumer_copy_output(struct consumer_output *obj)
316{
09a90bcd 317 struct lttng_ht *tmp_ht_ptr;
173af62f
DG
318 struct lttng_ht_iter iter;
319 struct consumer_socket *socket, *copy_sock;
00e2e675
DG
320 struct consumer_output *output;
321
322 assert(obj);
323
324 output = consumer_create_output(obj->type);
325 if (output == NULL) {
326 goto error;
327 }
09a90bcd
DG
328 /* Avoid losing the HT reference after the memcpy() */
329 tmp_ht_ptr = output->socks;
00e2e675
DG
330
331 memcpy(output, obj, sizeof(struct consumer_output));
332
09a90bcd
DG
333 /* Putting back the HT pointer and start copying socket(s). */
334 output->socks = tmp_ht_ptr;
173af62f
DG
335
336 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
337 /* Create new socket object. */
338 copy_sock = consumer_allocate_socket(socket->fd);
339 if (copy_sock == NULL) {
340 goto malloc_error;
341 }
342
09a90bcd 343 copy_sock->registered = socket->registered;
173af62f
DG
344 copy_sock->lock = socket->lock;
345 consumer_add_socket(copy_sock, output);
346 }
347
00e2e675
DG
348error:
349 return output;
173af62f
DG
350
351malloc_error:
352 consumer_destroy_output(output);
353 return NULL;
00e2e675
DG
354}
355
356/*
357 * Set network URI to the consumer output object.
358 *
ad20f474
DG
359 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
360 * error.
00e2e675
DG
361 */
362int consumer_set_network_uri(struct consumer_output *obj,
363 struct lttng_uri *uri)
364{
365 int ret;
366 char tmp_path[PATH_MAX];
367 char hostname[HOST_NAME_MAX];
368 struct lttng_uri *dst_uri = NULL;
369
370 /* Code flow error safety net. */
371 assert(obj);
372 assert(uri);
373
374 switch (uri->stype) {
375 case LTTNG_STREAM_CONTROL:
376 dst_uri = &obj->dst.net.control;
377 obj->dst.net.control_isset = 1;
378 if (uri->port == 0) {
379 /* Assign default port. */
380 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
381 }
ad20f474 382 DBG3("Consumer control URI set with port %d", uri->port);
00e2e675
DG
383 break;
384 case LTTNG_STREAM_DATA:
385 dst_uri = &obj->dst.net.data;
386 obj->dst.net.data_isset = 1;
387 if (uri->port == 0) {
388 /* Assign default port. */
389 uri->port = DEFAULT_NETWORK_DATA_PORT;
390 }
ad20f474 391 DBG3("Consumer data URI set with port %d", uri->port);
00e2e675
DG
392 break;
393 default:
394 ERR("Set network uri type unknown %d", uri->stype);
395 goto error;
396 }
397
398 ret = uri_compare(dst_uri, uri);
399 if (!ret) {
400 /* Same URI, don't touch it and return success. */
401 DBG3("URI network compare are the same");
ad20f474 402 goto equal;
00e2e675
DG
403 }
404
405 /* URIs were not equal, replacing it. */
406 memset(dst_uri, 0, sizeof(struct lttng_uri));
407 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
408 obj->type = CONSUMER_DST_NET;
409
410 /* Handle subdir and add hostname in front. */
411 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
412 /* Get hostname to append it in the pathname */
413 ret = gethostname(hostname, sizeof(hostname));
414 if (ret < 0) {
415 PERROR("gethostname. Fallback on default localhost");
416 strncpy(hostname, "localhost", sizeof(hostname));
417 }
418 hostname[sizeof(hostname) - 1] = '\0';
419
420 /* Setup consumer subdir if none present in the control URI */
421 if (strlen(dst_uri->subdir) == 0) {
422 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
423 hostname, obj->subdir);
424 } else {
425 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
426 hostname, dst_uri->subdir);
427 }
428 if (ret < 0) {
429 PERROR("snprintf set consumer uri subdir");
430 goto error;
431 }
432
433 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
434 DBG3("Consumer set network uri subdir path %s", tmp_path);
435 }
436
00e2e675 437 return 0;
ad20f474
DG
438equal:
439 return 1;
00e2e675
DG
440error:
441 return -1;
442}
443
444/*
445 * Send file descriptor to consumer via sock.
446 */
447int consumer_send_fds(int sock, int *fds, size_t nb_fd)
448{
449 int ret;
450
451 assert(fds);
452 assert(nb_fd > 0);
453
454 ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
455 if (ret < 0) {
456 PERROR("send consumer fds");
457 goto error;
458 }
459
460error:
461 return ret;
462}
463
464/*
465 * Consumer send channel communication message structure to consumer.
466 */
467int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
468{
469 int ret;
470
471 assert(msg);
472 assert(sock >= 0);
473
474 ret = lttcomm_send_unix_sock(sock, msg,
475 sizeof(struct lttcomm_consumer_msg));
476 if (ret < 0) {
477 PERROR("send consumer channel");
478 goto error;
479 }
480
481error:
482 return ret;
483}
484
485/*
486 * Init channel communication message structure.
487 */
488void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
489 enum lttng_consumer_command cmd,
490 int channel_key,
491 uint64_t max_sb_size,
492 uint64_t mmap_len,
c30aaa51
MD
493 const char *name,
494 unsigned int nb_init_streams)
00e2e675
DG
495{
496 assert(msg);
497
498 /* TODO: Args validation */
499
500 /* Zeroed structure */
501 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
502
503 /* Send channel */
504 msg->cmd_type = cmd;
505 msg->u.channel.channel_key = channel_key;
506 msg->u.channel.max_sb_size = max_sb_size;
507 msg->u.channel.mmap_len = mmap_len;
c30aaa51 508 msg->u.channel.nb_init_streams = nb_init_streams;
00e2e675
DG
509}
510
511/*
512 * Init stream communication message structure.
513 */
514void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
515 enum lttng_consumer_command cmd,
516 int channel_key,
517 int stream_key,
518 uint32_t state,
519 enum lttng_event_output output,
520 uint64_t mmap_len,
521 uid_t uid,
522 gid_t gid,
523 int net_index,
524 unsigned int metadata_flag,
525 const char *name,
ca22feea
DG
526 const char *pathname,
527 unsigned int session_id)
00e2e675
DG
528{
529 assert(msg);
530
531 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
532
533 /* TODO: Args validation */
534
535 msg->cmd_type = cmd;
536 msg->u.stream.channel_key = channel_key;
537 msg->u.stream.stream_key = stream_key;
538 msg->u.stream.state = state;
539 msg->u.stream.output = output;
540 msg->u.stream.mmap_len = mmap_len;
541 msg->u.stream.uid = uid;
542 msg->u.stream.gid = gid;
543 msg->u.stream.net_index = net_index;
544 msg->u.stream.metadata_flag = metadata_flag;
ca22feea 545 msg->u.stream.session_id = (uint64_t) session_id;
00e2e675
DG
546 strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
547 msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
548 strncpy(msg->u.stream.path_name, pathname,
549 sizeof(msg->u.stream.path_name));
550 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
551}
552
553/*
554 * Send stream communication structure to the consumer.
555 */
556int consumer_send_stream(int sock, struct consumer_output *dst,
557 struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
558{
559 int ret;
560
561 assert(msg);
562 assert(dst);
563
564 switch (dst->type) {
565 case CONSUMER_DST_NET:
566 /* Consumer should send the stream on the network. */
567 msg->u.stream.net_index = dst->net_seq_index;
568 break;
569 case CONSUMER_DST_LOCAL:
570 /* Add stream file name to stream path */
c30ce0b3
CB
571 strncat(msg->u.stream.path_name, "/",
572 sizeof(msg->u.stream.path_name) -
573 strlen(msg->u.stream.path_name) - 1);
00e2e675 574 strncat(msg->u.stream.path_name, msg->u.stream.name,
c30ce0b3
CB
575 sizeof(msg->u.stream.path_name) -
576 strlen(msg->u.stream.path_name) - 1);
00e2e675
DG
577 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
578 /* Indicate that the stream is NOT network */
579 msg->u.stream.net_index = -1;
580 break;
581 default:
582 ERR("Consumer unknown output type (%d)", dst->type);
583 ret = -1;
584 goto error;
585 }
586
587 /* Send on socket */
588 ret = lttcomm_send_unix_sock(sock, msg,
589 sizeof(struct lttcomm_consumer_msg));
590 if (ret < 0) {
591 PERROR("send consumer stream");
592 goto error;
593 }
594
595 ret = consumer_send_fds(sock, fds, nb_fd);
596 if (ret < 0) {
597 goto error;
598 }
599
600error:
601 return ret;
602}
37278a1e
DG
603
604/*
605 * Send relayd socket to consumer associated with a session name.
606 *
607 * On success return positive value. On error, negative value.
608 */
609int consumer_send_relayd_socket(int consumer_sock,
610 struct lttcomm_sock *sock, struct consumer_output *consumer,
611 enum lttng_stream_type type)
612{
613 int ret;
614 struct lttcomm_consumer_msg msg;
615
616 /* Code flow error. Safety net. */
617 assert(sock);
618 assert(consumer);
619
620 /* Bail out if consumer is disabled */
621 if (!consumer->enabled) {
f73fabfd 622 ret = LTTNG_OK;
37278a1e
DG
623 goto error;
624 }
625
626 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
627 /*
628 * Assign network consumer output index using the temporary consumer since
629 * this call should only be made from within a set_consumer_uri() function
630 * call in the session daemon.
631 */
632 msg.u.relayd_sock.net_index = consumer->net_seq_index;
633 msg.u.relayd_sock.type = type;
634 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
635
173af62f 636 DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
37278a1e
DG
637 ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
638 if (ret < 0) {
639 PERROR("send consumer relayd socket info");
640 goto error;
641 }
642
643 DBG3("Sending relayd socket file descriptor to consumer");
644 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
645 if (ret < 0) {
646 goto error;
647 }
648
649 DBG2("Consumer relayd socket sent");
650
651error:
652 return ret;
653}
173af62f
DG
654
655/*
2f77fc4b
DG
656 * Set consumer subdirectory using the session name and a generated datetime if
657 * needed. This is appended to the current subdirectory.
173af62f 658 */
2f77fc4b
DG
659int consumer_set_subdir(struct consumer_output *consumer,
660 const char *session_name)
173af62f 661{
2f77fc4b
DG
662 int ret = 0;
663 unsigned int have_default_name = 0;
664 char datetime[16], tmp_path[PATH_MAX];
665 time_t rawtime;
666 struct tm *timeinfo;
173af62f
DG
667
668 assert(consumer);
2f77fc4b
DG
669 assert(session_name);
670
671 memset(tmp_path, 0, sizeof(tmp_path));
672
673 /* Flag if we have a default session. */
674 if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
675 strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
676 have_default_name = 1;
677 } else {
678 /* Get date and time for session path */
679 time(&rawtime);
680 timeinfo = localtime(&rawtime);
681 strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
173af62f
DG
682 }
683
2f77fc4b
DG
684 if (have_default_name) {
685 ret = snprintf(tmp_path, sizeof(tmp_path),
686 "%s/%s", consumer->subdir, session_name);
687 } else {
688 ret = snprintf(tmp_path, sizeof(tmp_path),
689 "%s/%s-%s/", consumer->subdir, session_name, datetime);
690 }
173af62f 691 if (ret < 0) {
2f77fc4b 692 PERROR("snprintf session name date");
173af62f
DG
693 goto error;
694 }
695
2f77fc4b
DG
696 strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
697 DBG2("Consumer subdir set to %s", consumer->subdir);
173af62f
DG
698
699error:
700 return ret;
701}
806e2684
DG
702
703/*
704 * Ask the consumer if the data is ready to bread (available) for the specific
705 * session id.
706 *
707 * This function has a different behavior with the consumer i.e. that it waits
708 * for a reply from the consumer if yes or no the data is available.
709 */
710int consumer_is_data_available(unsigned int id,
711 struct consumer_output *consumer)
712{
713 int ret;
c801b30e 714 int32_t ret_code = 1; /* Default is that the data is available */
806e2684
DG
715 struct consumer_socket *socket;
716 struct lttng_ht_iter iter;
717 struct lttcomm_consumer_msg msg;
718
719 assert(consumer);
720
721 msg.cmd_type = LTTNG_CONSUMER_DATA_AVAILABLE;
722
723 msg.u.data_available.session_id = (uint64_t) id;
724
725 DBG3("Consumer data available for id %u", id);
726
c8f59ee5 727 /* Send command for each consumer */
806e2684
DG
728 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
729 node.node) {
730 /* Code flow error */
731 assert(socket->fd >= 0);
732
733 pthread_mutex_lock(socket->lock);
734
735 ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
736 if (ret < 0) {
737 PERROR("send consumer data available command");
738 pthread_mutex_unlock(socket->lock);
739 goto error;
740 }
741
742 /*
743 * Waiting for the reply code where 0 the data is not available and 1
744 * it is for trace reading.
745 */
746 ret = lttcomm_recv_unix_sock(socket->fd, &ret_code, sizeof(ret_code));
747 if (ret < 0) {
748 PERROR("recv consumer data available status");
749 pthread_mutex_unlock(socket->lock);
750 goto error;
751 }
752
753 pthread_mutex_unlock(socket->lock);
754
755 if (ret_code == 0) {
756 break;
757 }
758 }
759
760 DBG("Consumer data available ret %d", ret_code);
761 return ret_code;
762
763error:
764 return -1;
765}
This page took 0.055399 seconds and 4 git commands to generate.