projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Add lttng hash table support to liblttng-consumer
[lttng-tools.git]
/
liblttng-consumer
/
lttng-consumer.c
diff --git
a/liblttng-consumer/lttng-consumer.c
b/liblttng-consumer/lttng-consumer.c
index 0811e68ca8e8368c7695f92f37a44c906b42b7c0..f4af47404c6053af72f90107a5ea6acf9c3650c2 100644
(file)
--- a/
liblttng-consumer/lttng-consumer.c
+++ b/
liblttng-consumer/lttng-consumer.c
@@
-37,8
+37,6
@@
#include <lttngerr.h>
struct lttng_consumer_global_data consumer_data = {
#include <lttngerr.h>
struct lttng_consumer_global_data consumer_data = {
- .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head),
- .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head),
.stream_count = 0,
.need_update = 1,
.type = LTTNG_CONSUMER_UNKNOWN,
.stream_count = 0,
.need_update = 1,
.type = LTTNG_CONSUMER_UNKNOWN,
@@
-61,18
+59,22
@@
volatile int consumer_quit = 0;
*/
static struct lttng_consumer_stream *consumer_find_stream(int key)
{
*/
static struct lttng_consumer_stream *consumer_find_stream(int key)
{
- struct lttng_consumer_stream *iter;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_consumer_stream *stream = NULL;
/* Negative keys are lookup failures */
if (key < 0)
return NULL;
/* Negative keys are lookup failures */
if (key < 0)
return NULL;
- cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
- if (iter->key == key) {
- DBG("Found stream key %d", key);
- return iter;
- }
+
+ lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ stream = caa_container_of(node, struct lttng_consumer_stream, node);
}
}
- return NULL;
+
+ return stream;
}
static void consumer_steal_stream_key(int key)
}
static void consumer_steal_stream_key(int key)
@@
-86,18
+88,22
@@
static void consumer_steal_stream_key(int key)
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
- struct lttng_consumer_channel *iter;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_consumer_channel *channel = NULL;
/* Negative keys are lookup failures */
if (key < 0)
return NULL;
/* Negative keys are lookup failures */
if (key < 0)
return NULL;
- cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
- if (iter->key == key) {
- DBG("Found channel key %d", key);
- return iter;
- }
+
+ lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ channel = caa_container_of(node, struct lttng_consumer_channel, node);
}
}
- return NULL;
+
+ return channel;
}
static void consumer_steal_channel_key(int key)
}
static void consumer_steal_channel_key(int key)
@@
-116,6
+122,7
@@
static void consumer_steal_channel_key(int key)
void consumer_del_stream(struct lttng_consumer_stream *stream)
{
int ret;
void consumer_del_stream(struct lttng_consumer_stream *stream)
{
int ret;
+ struct lttng_ht_iter iter;
struct lttng_consumer_channel *free_chan = NULL;
pthread_mutex_lock(&consumer_data.lock);
struct lttng_consumer_channel *free_chan = NULL;
pthread_mutex_lock(&consumer_data.lock);
@@
-139,7
+146,13
@@
void consumer_del_stream(struct lttng_consumer_stream *stream)
goto end;
}
goto end;
}
- cds_list_del(&stream->list);
+ /* Get stream node from hash table */
+ lttng_ht_lookup(consumer_data.stream_ht,
+ (void *)((unsigned long) stream->key), &iter);
+ /* Remove stream node from hash table */
+ ret = lttng_ht_del(consumer_data.stream_ht, &iter);
+ assert(!ret);
+
if (consumer_data.stream_count <= 0) {
goto end;
}
if (consumer_data.stream_count <= 0) {
goto end;
}
@@
-205,6
+218,7
@@
struct lttng_consumer_stream *consumer_allocate_stream(
stream->gid = gid;
strncpy(stream->path_name, path_name, PATH_MAX - 1);
stream->path_name[PATH_MAX - 1] = '\0';
stream->gid = gid;
strncpy(stream->path_name, path_name, PATH_MAX - 1);
stream->path_name[PATH_MAX - 1] = '\0';
+ lttng_ht_node_init_ulong(&stream->node, stream->key);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
@@
-243,7
+257,7
@@
int consumer_add_stream(struct lttng_consumer_stream *stream)
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
-
cds_list_add(&stream->list, &consumer_data.stream_list.head
);
+
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node
);
consumer_data.stream_count++;
consumer_data.need_update = 1;
consumer_data.stream_count++;
consumer_data.need_update = 1;
@@
-290,6
+304,7
@@
void consumer_change_stream_state(int stream_key,
void consumer_del_channel(struct lttng_consumer_channel *channel)
{
int ret;
void consumer_del_channel(struct lttng_consumer_channel *channel)
{
int ret;
+ struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&consumer_data.lock);
@@
-306,7
+321,11
@@
void consumer_del_channel(struct lttng_consumer_channel *channel)
goto end;
}
goto end;
}
- cds_list_del(&channel->list);
+ lttng_ht_lookup(consumer_data.channel_ht,
+ (void *)((unsigned long) channel->key), &iter);
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
+
if (channel->mmap_base != NULL) {
ret = munmap(channel->mmap_base, channel->mmap_len);
if (ret != 0) {
if (channel->mmap_base != NULL) {
ret = munmap(channel->mmap_base, channel->mmap_len);
if (ret != 0) {
@@
-346,6
+365,7
@@
struct lttng_consumer_channel *consumer_allocate_channel(
channel->max_sb_size = max_sb_size;
channel->refcount = 0;
channel->nr_streams = 0;
channel->max_sb_size = max_sb_size;
channel->refcount = 0;
channel->nr_streams = 0;
+ lttng_ht_node_init_ulong(&channel->node, channel->key);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
@@
-383,7
+403,7
@@
int consumer_add_channel(struct lttng_consumer_channel *channel)
pthread_mutex_lock(&consumer_data.lock);
/* Steal channel identifier, for UST */
consumer_steal_channel_key(channel->key);
pthread_mutex_lock(&consumer_data.lock);
/* Steal channel identifier, for UST */
consumer_steal_channel_key(channel->key);
-
cds_list_add(&channel->list, &consumer_data.channel_list.head
);
+
lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node
);
pthread_mutex_unlock(&consumer_data.lock);
return 0;
}
pthread_mutex_unlock(&consumer_data.lock);
return 0;
}
@@
-399,18
+419,20
@@
int consumer_update_poll_array(
struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
struct lttng_consumer_stream **local_stream)
{
struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
struct lttng_consumer_stream **local_stream)
{
- struct lttng_consumer_stream *iter;
int i = 0;
int i = 0;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
DBG("Updating poll fd array");
DBG("Updating poll fd array");
- cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
- if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
+ cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
continue;
}
continue;
}
- DBG("Active FD %d",
iter
->wait_fd);
- (*pollfd)[i].fd =
iter
->wait_fd;
+ DBG("Active FD %d",
stream
->wait_fd);
+ (*pollfd)[i].fd =
stream
->wait_fd;
(*pollfd)[i].events = POLLIN | POLLPRI;
(*pollfd)[i].events = POLLIN | POLLPRI;
- local_stream[i] =
iter
;
+ local_stream[i] =
stream
;
i++;
}
i++;
}
@@
-486,21
+508,28
@@
int lttng_consumer_send_error(
*/
void lttng_consumer_cleanup(void)
{
*/
void lttng_consumer_cleanup(void)
{
- struct lttng_consumer_stream *iter, *tmp;
- struct lttng_consumer_channel *citer, *ctmp;
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+ struct lttng_consumer_channel *channel;
/*
* close all outfd. Called when there are no more threads
* running (after joining on the threads), no need to protect
* list iteration with mutex.
*/
/*
* close all outfd. Called when there are no more threads
* running (after joining on the threads), no need to protect
* list iteration with mutex.
*/
- cds_list_for_each_entry_safe(iter, tmp,
- &consumer_data.stream_list.head, list) {
- consumer_del_stream(iter);
+ cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
+ node.node) {
+ ret = lttng_ht_del(consumer_data.stream_ht, &iter);
+ assert(!ret);
+ consumer_del_stream(stream);
}
}
- cds_list_for_each_entry_safe(citer, ctmp,
- &consumer_data.channel_list.head, list) {
- consumer_del_channel(citer);
+
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
+ node.node) {
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
+ consumer_del_channel(channel);
}
}
}
}
@@
-759,7
+788,7
@@
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
}
/*
}
/*
- * This thread polls the fds in the
ltt_fd_lis
t to consume the data and write
+ * This thread polls the fds in the
se
t to consume the data and write
* it to tracefile if necessary.
*/
void *lttng_consumer_thread_poll_fds(void *data)
* it to tracefile if necessary.
*/
void *lttng_consumer_thread_poll_fds(void *data)
@@
-781,7
+810,7
@@
void *lttng_consumer_thread_poll_fds(void *data)
num_hup = 0;
/*
num_hup = 0;
/*
- * the
ltt_fd_lis
t has been updated, we need to update our
+ * the
fds se
t has been updated, we need to update our
* local array as well
*/
pthread_mutex_lock(&consumer_data.lock);
* local array as well
*/
pthread_mutex_lock(&consumer_data.lock);
@@
-1073,3
+1102,13
@@
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
return -ENOSYS;
}
}
return -ENOSYS;
}
}
+
+/*
+ * Allocate and set consumer data hash tables.
+ */
+void lttng_consumer_init(void)
+{
+ consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+}
+
This page took
0.026286 seconds
and
4
git commands to generate.