#include "ust-ctl.h"
#include "utils.h"
+static
+int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess);
+
/* Next available channel key. Access under next_channel_key_lock. */
static uint64_t _next_channel_key;
static pthread_mutex_t next_channel_key_lock = PTHREAD_MUTEX_INITIALIZER;
return ret;
}
+/*
+ * Match function for a hash table lookup of ust_app_ctx.
+ *
+ * It matches an ust app context based on the context type and, in the case
+ * of perf counters, their name.
+ */
+static int ht_match_ust_app_ctx(struct cds_lfht_node *node, const void *_key)
+{
+ struct ust_app_ctx *ctx;
+ const struct lttng_ust_context *key;
+
+ assert(node);
+ assert(_key);
+
+ ctx = caa_container_of(node, struct ust_app_ctx, node.node);
+ key = _key;
+
+ /* Context type */
+ if (ctx->ctx.ctx != key->ctx) {
+ goto no_match;
+ }
+
+ /* Check the name in the case of perf thread counters. */
+ if (key->ctx == LTTNG_UST_CONTEXT_PERF_THREAD_COUNTER) {
+ if (strncmp(key->u.perf_counter.name,
+ ctx->ctx.u.perf_counter.name,
+ sizeof(key->u.perf_counter.name))) {
+ goto no_match;
+ }
+ }
+
+ /* Match. */
+ return 1;
+
+no_match:
+ return 0;
+}
+
+/*
+ * Lookup for an ust app context from an lttng_ust_context.
+ *
+ * Must be called while holding RCU read side lock.
+ * Return an ust_app_ctx object or NULL on error.
+ */
+static
+struct ust_app_ctx *find_ust_app_context(struct lttng_ht *ht,
+ struct lttng_ust_context *uctx)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct ust_app_ctx *app_ctx = NULL;
+
+ assert(uctx);
+ assert(ht);
+
+ /* Lookup using the lttng_ust_context_type and a custom match fct. */
+ cds_lfht_lookup(ht->ht, ht->hash_fct((void *) uctx->ctx, lttng_ht_seed),
+ ht_match_ust_app_ctx, uctx, &iter.iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (!node) {
+ goto end;
+ }
+
+ app_ctx = caa_container_of(node, struct ust_app_ctx, node);
+
+end:
+ return app_ctx;
+}
+
/*
* Create a context for the channel on the tracer.
*
struct ust_app *app)
{
int ret = 0;
- struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
struct ust_app_ctx *ua_ctx;
DBG2("UST app adding context to channel %s", ua_chan->name);
- lttng_ht_lookup(ua_chan->ctx, (void *)((unsigned long)uctx->ctx), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
- if (node != NULL) {
+ ua_ctx = find_ust_app_context(ua_chan->ctx, uctx);
+ if (ua_ctx) {
ret = -EEXIST;
goto error;
}
{
struct ust_app *lta;
struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter ust_app_sock_iter;
struct lttng_ht_iter iter;
struct ust_app_session *ua_sess;
int ret;
rcu_read_lock();
/* Get the node reference for a call_rcu */
- lttng_ht_lookup(ust_app_ht_by_sock, (void *)((unsigned long) sock), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ lttng_ht_lookup(ust_app_ht_by_sock, (void *)((unsigned long) sock), &ust_app_sock_iter);
+ node = lttng_ht_iter_get_node_ulong(&ust_app_sock_iter);
assert(node);
lta = caa_container_of(node, struct ust_app, sock_n);
DBG("PID %d unregistering with sock %d", lta->pid, sock);
- /* Remove application from PID hash table */
- ret = lttng_ht_del(ust_app_ht_by_sock, &iter);
- assert(!ret);
-
- /*
- * Remove application from notify hash table. The thread handling the
- * notify socket could have deleted the node so ignore on error because
- * either way it's valid. The close of that socket is handled by the other
- * thread.
- */
- iter.iter.node = <a->notify_sock_n.node;
- (void) lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
-
/*
- * Ignore return value since the node might have been removed before by an
- * add replace during app registration because the PID can be reassigned by
- * the OS.
+ * Perform "push metadata" and flush all application streams
+ * before removing app from hash tables, ensuring proper
+ * behavior of data_pending check.
+ * Remove sessions so they are not visible during deletion.
*/
- iter.iter.node = <a->pid_n.node;
- ret = lttng_ht_del(ust_app_ht, &iter);
- if (ret) {
- DBG3("Unregister app by PID %d failed. This can happen on pid reuse",
- lta->pid);
- }
-
- /* Remove sessions so they are not visible during deletion.*/
cds_lfht_for_each_entry(lta->sessions->ht, &iter.iter, ua_sess,
node.node) {
struct ust_registry_session *registry;
continue;
}
+ (void) ust_app_flush_app_session(lta, ua_sess);
+
/*
* Add session to list for teardown. This is safe since at this point we
* are the only one using this list.
(void) close_metadata(registry, ua_sess->consumer);
}
}
-
cds_list_add(&ua_sess->teardown_node, <a->teardown_head);
+
pthread_mutex_unlock(&ua_sess->lock);
}
+ /* Remove application from PID hash table */
+ ret = lttng_ht_del(ust_app_ht_by_sock, &ust_app_sock_iter);
+ assert(!ret);
+
+ /*
+ * Remove application from notify hash table. The thread handling the
+ * notify socket could have deleted the node so ignore on error because
+ * either way it's valid. The close of that socket is handled by the other
+ * thread.
+ */
+ iter.iter.node = <a->notify_sock_n.node;
+ (void) lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
+
+ /*
+ * Ignore return value since the node might have been removed before by an
+ * add replace during app registration because the PID can be reassigned by
+ * the OS.
+ */
+ iter.iter.node = <a->pid_n.node;
+ ret = lttng_ht_del(ust_app_ht, &iter);
+ if (ret) {
+ DBG3("Unregister app by PID %d failed. This can happen on pid reuse",
+ lta->pid);
+ }
+
/* Free memory */
call_rcu(<a->pid_n.head, delete_ust_app_rcu);
return -1;
}
-/*
- * Flush buffers for a specific UST session and app.
- */
static
-int ust_app_flush_trace(struct ltt_ust_session *usess, struct ust_app *app)
+int ust_app_flush_app_session(struct ust_app *app,
+ struct ust_app_session *ua_sess)
{
- int ret = 0;
+ int ret, retval = 0;
struct lttng_ht_iter iter;
- struct ust_app_session *ua_sess;
struct ust_app_channel *ua_chan;
+ struct consumer_socket *socket;
- DBG("Flushing buffers for ust app pid %d", app->pid);
+ DBG("Flushing app session buffers for ust app pid %d", app->pid);
rcu_read_lock();
if (!app->compatible) {
- goto end_no_session;
- }
-
- ua_sess = lookup_session_by_app(usess, app);
- if (ua_sess == NULL) {
- goto end_no_session;
+ goto end_not_compatible;
}
pthread_mutex_lock(&ua_sess->lock);
health_code_update();
/* Flushing buffers */
+ socket = consumer_find_socket_by_bitness(app->bits_per_long,
+ ua_sess->consumer);
cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, ua_chan,
node.node) {
health_code_update();
assert(ua_chan->is_sent);
- ret = ustctl_sock_flush_buffer(app->sock, ua_chan->obj);
- if (ret < 0) {
- if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
- ERR("UST app PID %d channel %s flush failed with ret %d",
- app->pid, ua_chan->name, ret);
- } else {
- DBG3("UST app failed to flush %s. Application is dead.",
- ua_chan->name);
- /*
- * This is normal behavior, an application can die during the
- * creation process. Don't report an error so the execution can
- * continue normally.
- */
- }
- /* Continuing flushing all buffers */
+ ret = consumer_flush_channel(socket, ua_chan->key);
+ if (ret) {
+ ERR("Error flushing consumer channel");
+ retval = -1;
continue;
}
}
health_code_update();
pthread_mutex_unlock(&ua_sess->lock);
+end_not_compatible:
+ rcu_read_unlock();
+ health_code_update();
+ return retval;
+}
+
+/*
+ * Flush buffers for a specific UST session and app.
+ */
+static
+int ust_app_flush_session(struct ust_app *app, struct ltt_ust_session *usess)
+
+{
+ int ret;
+ struct ust_app_session *ua_sess;
+
+ DBG("Flushing session buffers for ust app pid %d", app->pid);
+
+ rcu_read_lock();
+
+ ua_sess = lookup_session_by_app(usess, app);
+ if (ua_sess == NULL) {
+ ret = -1;
+ goto end_no_session;
+ }
+ ret = ust_app_flush_app_session(app, ua_sess);
+
end_no_session:
rcu_read_unlock();
health_code_update();
- return 0;
+ return ret;
}
/*
}
case LTTNG_BUFFER_PER_PID:
cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
- ret = ust_app_flush_trace(usess, app);
+ ret = ust_app_flush_session(app, usess);
if (ret < 0) {
/* Continue to next apps even on error */
continue;
cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
struct buffer_reg_channel *reg_chan;
+ rcu_read_lock();
cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
reg_chan, node.node) {
ret += reg_chan->stream_count;
}
+ rcu_read_unlock();
}
break;
}