* Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; only version 2
- * of the License.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2 only,
+ * as published by the Free Software Foundation.
*
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
*
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#define _GNU_SOURCE
#include <assert.h>
-#include <fcntl.h>
#include <poll.h>
#include <pthread.h>
#include <stdlib.h>
{
struct lttng_consumer_stream *stream;
+ rcu_read_lock();
stream = consumer_find_stream(key);
- if (stream)
+ if (stream) {
stream->key = -1;
+ /*
+ * We don't want the lookup to match, but we still need
+ * to iterate on this stream when iterating over the hash table. Just
+ * change the node key.
+ */
+ stream->node.key = -1;
+ }
+ rcu_read_unlock();
}
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
struct lttng_consumer_channel *channel;
+ rcu_read_lock();
channel = consumer_find_channel(key);
- if (channel)
+ if (channel) {
channel->key = -1;
+ /*
+ * We don't want the lookup to match, but we still need
+ * to iterate on this channel when iterating over the hash table. Just
+ * change the node key.
+ */
+ channel->node.key = -1;
+ }
+ rcu_read_unlock();
}
static
}
rcu_read_lock();
-
- /* Get stream node from hash table */
- lttng_ht_lookup(consumer_data.stream_ht,
- (void *)((unsigned long) stream->key), &iter);
- /*
- * Remove stream node from hash table. It can fail if it's been
- * replaced due to key reuse.
- */
- (void) lttng_ht_del(consumer_data.stream_ht, &iter);
+ iter.iter.node = &stream->node.node;
+ ret = lttng_ht_del(consumer_data.stream_ht, &iter);
+ assert(!ret);
rcu_read_unlock();
int consumer_add_stream(struct lttng_consumer_stream *stream)
{
int ret = 0;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
rcu_read_lock();
- /*
- * We simply remove the old channel from the hash table. It's
- * ok, since we know for sure the sessiond wants to replace it
- * with the new version, because the key has been reused.
- */
- (void) lttng_ht_add_replace_ulong(consumer_data.stream_ht, &stream->node);
+
+ lttng_ht_lookup(consumer_data.stream_ht,
+ (void *)((unsigned long) stream->key), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ rcu_read_unlock();
+ /* Stream already exist. Ignore the insertion */
+ goto end;
+ }
+
+ lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
rcu_read_unlock();
consumer_data.stream_count++;
consumer_data.need_update = 1;
}
rcu_read_lock();
-
- lttng_ht_lookup(consumer_data.channel_ht,
- (void *)((unsigned long) channel->key), &iter);
-
- /*
- * Remove channel node from hash table. It can fail if it's been
- * replaced due to key reuse.
- */
- (void) lttng_ht_del(consumer_data.channel_ht, &iter);
-
+ iter.iter.node = &channel->node.node;
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
rcu_read_unlock();
if (channel->mmap_base != NULL) {
*/
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
+
pthread_mutex_lock(&consumer_data.lock);
/* Steal channel identifier, for UST */
consumer_steal_channel_key(channel->key);
rcu_read_lock();
- /*
- * We simply remove the old channel from the hash table. It's
- * ok, since we know for sure the sessiond wants to replace it
- * with the new version, because the key has been reused.
- */
- (void) lttng_ht_add_replace_ulong(consumer_data.channel_ht, &channel->node);
+
+ lttng_ht_lookup(consumer_data.channel_ht,
+ (void *)((unsigned long) channel->key), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ /* Channel already exist. Ignore the insertion */
+ goto end;
+ }
+
+ lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+
+end:
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
* increment i so nb_fd is the number of real FD.
*/
(*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
- (*pollfd)[i].events = POLLIN;
+ (*pollfd)[i].events = POLLIN | POLLPRI;
return i;
}
perror("Poll error");
goto exit;
}
- if (consumer_sockpoll[0].revents == POLLIN) {
+ if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
DBG("consumer_should_quit wake up");
goto exit;
}
}
rcu_read_unlock();
+
+ lttng_ht_destroy(consumer_data.stream_ht);
+ lttng_ht_destroy(consumer_data.channel_ht);
}
/*
if (orig_offset < stream->chan->max_sb_size) {
return;
}
- sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
+ lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
stream->chan->max_sb_size,
SYNC_FILE_RANGE_WAIT_BEFORE
| SYNC_FILE_RANGE_WRITE
goto error_poll_pipe;
}
+ /* set read end of the pipe to non-blocking */
+ ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
+ if (ret < 0) {
+ perror("fcntl O_NONBLOCK");
+ goto error_poll_fcntl;
+ }
+
+ /* set write end of the pipe to non-blocking */
+ ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
+ if (ret < 0) {
+ perror("fcntl O_NONBLOCK");
+ goto error_poll_fcntl;
+ }
+
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
perror("Error creating recv pipe");
PERROR("close");
}
}
+error_poll_fcntl:
error_quit_pipe:
for (i = 0; i < 2; i++) {
int err;
ERR("Unknown consumer_data type");
assert(0);
}
+
+ return 0;
}
/*
struct lttng_consumer_stream **local_stream = NULL;
/* local view of consumer_data.fds_count */
int nb_fd = 0;
- char tmp;
- int tmp2;
struct lttng_consumer_local_data *ctx = data;
rcu_register_thread();
* array. We want to prioritize array update over
* low-priority reads.
*/
- if (pollfd[nb_fd].revents & POLLIN) {
+ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
+ size_t pipe_readlen;
+ char tmp;
+
DBG("consumer_poll_pipe wake up");
- tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
- if (tmp2 < 0) {
- perror("read consumer poll");
- }
+ /* Consume 1 byte of pipe data */
+ do {
+ pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
+ } while (pipe_readlen == -1 && errno == EINTR);
continue;
}
- /* Take care of high priority channels first. */
+
+ /* Check if each pipe has data. hack for cygwin. */
for (i = 0; i < nb_fd; i++) {
- if (pollfd[i].revents & POLLPRI) {
- ssize_t len;
+ if ((pollfd[i].revents & POLLIN) ||
+ local_stream[i]->hangup_flush_done) {
+ int check_ret;
- DBG("Urgent read on fd %d", pollfd[i].fd);
- high_prio = 1;
- len = ctx->on_buffer_ready(local_stream[i], ctx);
- /* it's ok to have an unavailable sub-buffer */
- if (len < 0 && len != -EAGAIN) {
- goto end;
- } else if (len > 0) {
- local_stream[i]->data_read = 1;
+ check_ret = lttng_consumer_check_pipe(local_stream[i], ctx);
+ if (check_ret != 0) {
+ pollfd[i].revents |= POLLHUP;
}
}
}
- /*
- * If we read high prio channel in this loop, try again
- * for more high prio data.
- */
- if (high_prio) {
- continue;
- }
+ /* Take care of high priority channels first. */
+ /* for (i = 0; i < nb_fd; i++) { */
+ /* DBG("!!! POLL FLAGS: %d", pollfd[i].revents); */
+ /* if (pollfd[i].revents & POLLPRI) { */
+ /* ssize_t len; */
+
+ /* DBG("Urgent read on fd %d", pollfd[i].fd); */
+ /* high_prio = 1; */
+ /* len = ctx->on_buffer_ready(local_stream[i], ctx); */
+ /* /\* it's ok to have an unavailable sub-buffer *\/ */
+ /* if (len < 0 && len != -EAGAIN) { */
+ /* goto end; */
+ /* } else if (len > 0) { */
+ /* local_stream[i]->data_read = 1; */
+ /* } */
+ /* } */
+ /* } */
+
+ /* /\* */
+ /* * If we read high prio channel in this loop, try again */
+ /* * for more high prio data. */
+ /* *\/ */
+ /* if (high_prio) { */
+ /* continue; */
+ /* } */
/* Take care of low priority channels. */
for (i = 0; i < nb_fd; i++) {
local_stream[i]->hangup_flush_done) {
ssize_t len;
- assert(!(pollfd[i].revents & POLLERR));
- assert(!(pollfd[i].revents & POLLNVAL));
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
DBG("Received STOP command");
goto end;
}
- if (ret < 0) {
- ERR("Communication interrupted on command socket");
+ if (ret <= 0) {
+ /*
+ * This could simply be a session daemon quitting. Don't output
+ * ERR() here.
+ */
+ DBG("Communication interrupted on command socket");
goto end;
}
if (consumer_quit) {
*/
consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
- /* wake up the polling thread */
- ret = write(ctx->consumer_poll_pipe[1], "4", 1);
- if (ret < 0) {
- perror("poll pipe write");
- }
+ /*
+ * Wake-up the other end by writing a null byte in the pipe
+ * (non-blocking). Important note: Because writing into the
+ * pipe is non-blocking (and therefore we allow dropping wakeup
+ * data, as long as there is wakeup data present in the pipe
+ * buffer to wake up the other end), the other end should
+ * perform the following sequence for waiting:
+ * 1) empty the pipe (reads).
+ * 2) perform update operation.
+ * 3) wait on the pipe (poll).
+ */
+ do {
+ ret = write(ctx->consumer_poll_pipe[1], "", 1);
+ } while (ret == -1UL && errno == EINTR);
rcu_unregister_thread();
return NULL;
}
}
}
+int lttng_consumer_check_pipe(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ assert(0);
+ return -ENOSYS;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_check_pipe(stream, ctx);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
{
switch (consumer_data.type) {