Fix: sessiond: sessiond and agent deadlock on destroy
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 21 Apr 2020 21:08:10 +0000 (17:08 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 22 Apr 2020 16:26:04 +0000 (12:26 -0400)
Observed issue
--------------

While running the out-of-tree java agent tests [1], the session daemon
and agent often end up in a deadlock.

Attaching gdb to the session daemon, we can see that two threads are
blocked in an intriguing state.

Thread 13 (Thread 0x7f89027fc700 (LWP 9636)):
 #0  0x00007f891e81a4cf in __lll_lock_wait () from /usr/lib/libpthread.so.0
 #1  0x00007f891e812e03 in pthread_mutex_lock () from /usr/lib/libpthread.so.0
 #2  0x000055637f1fbd92 in session_lock_list () at session.c:156
 #3  0x000055637f25dc47 in update_agent_app (app=0x7f88ec003480) at agent-thread.c:56
 #4  0x000055637f25ec0a in thread_agent_management (data=0x556380cd2400) at agent-thread.c:426
 #5  0x000055637f22fb3a in launch_thread (data=0x556380cd24a0) at thread.c:65
 #6  0x00007f891e81046f in start_thread () from /usr/lib/libpthread.so.0
 #7  0x00007f891e7203d3 in clone () from /usr/lib/libc.so.6

Thread 8 (Thread 0x7f8919309700 (LWP 9631)):
 #0  0x00007f891e81b44d in recvmsg () from /usr/lib/libpthread.so.0
 #1  0x000055637f267847 in lttcomm_recvmsg_inet_sock (sock=0x7f88ec0033c0, buf=0x7f89192f5d5c, len=4, flags=0) at inet.c:367
 #2  0x000055637f2146c6 in recv_reply (sock=0x7f88ec0033c0, buf=0x7f89192f5d5c, size=4) at agent.c:275
 #3  0x000055637f215202 in app_context_op (app=0x7f88ec003400, ctx=0x7f8908020900, cmd=AGENT_CMD_APP_CTX_DISABLE) at agent.c:552
 #4  0x000055637f215c2d in disable_context (ctx=0x7f8908020900, domain=LTTNG_DOMAIN_JUL) at agent.c:841
 #5  0x000055637f217480 in agent_destroy (agt=0x7f890801dc20) at agent.c:1326
 #6  0x000055637f243448 in trace_ust_destroy_session (session=0x7f8908004010) at trace-ust.c:1408
 #7  0x000055637f1fd775 in session_release (ref=0x7f8908001e70) at session.c:873
 #8  0x000055637f1fb9ac in urcu_ref_put (ref=0x7f8908001e70, release=0x55637f1fd62a <session_release>) at /usr/include/urcu/ref.h:68
 #9  0x000055637f1fdad2 in session_put (session=0x7f8908000d10) at session.c:942
 #10 0x000055637f2369e6 in process_client_msg (cmd_ctx=0x7f890800e6e0, sock=0x7f8919308560, sock_error=0x7f8919308564) at client.c:2102
 #11 0x000055637f2375ab in thread_manage_clients (data=0x556380cd1840) at client.c:2347
 #12 0x000055637f22fb3a in launch_thread (data=0x556380cd18b0) at thread.c:65
 #13 0x00007f891e81046f in start_thread () from /usr/lib/libpthread.so.0
 #14 0x00007f891e7203d3 in clone () from /usr/lib/libc.so.6

T8 is holding session list lock while the cmd_destroy_session
command is being processed. More specifically, it is attempting
to destroy an "agent_context" by communicating with an "agent"
application.

Meanwhile, T13 is still registering that same "agent" application.

Cause
-----

The deadlock itself is pretty simple to understand.

The "agent thread" (T13) has the responsability of accepting new agent
application connections. When such a connection occurs, the thread
creates a new `agent_app` instance and sends the current sessions'
configuration (i.e. their event rules and contexts) to the agent
application. When that "update" is complete, a "registration done"
message is sent to the new agent application.

From the stacktrace above, we can see that T13 is attempting to update
the agent application with its initial configuration, but it is
blocked on the acquisition of the session list lock. The application's
agent is also blocked since it is waiting for the "registration done"
message before allowing tracing to proceed (not shown here, but seen
in the test logs).

Meanwhile, T8 is holding the session list lock while destroying a
session. This is expected as all client commands are executed with
this lock held. It is, amongst other reasons, used to serialize
changes to the sessions' configuration and configuration updates sent
to the tracers (i.e. because new apps appear or to keep existing
tracers in sync with the users' session configuration).

The question becomes: why is T8 tearing down an application that is
not yet registered?

First, inspecting `agent_app` immediately shows that this structure
has no built-in synchronization mechanism. Therefore, the fact that
two threads are accessing it at the same time raises a big red flag.

Speculating on the intentions of the original design, my intuition is
that the "agent_management" thread's role is limited to instantiating
an `agent_app` and synchronizing it with the various sessions'
configuration. Once that synchronization is performed, the agent
application should be published and never accessed again by the "agent
thread".

Configuration updates (i.e. new event rules, contexts) are then sent
synchronously as they are requested by a client in the context of the
client thread. Those updates are performed while holding the session
list lock.

Hence, there is only one thread that should manipulate the agent
application at any given time making an explicit `agent_app` lock
unnecessary.

Overall, this would echo what is done when a 'user space tracer'
application registers to the session daemon (see dispatch.c:368).

Evidently this isn't what is happening here.

The agent thread creates the `agent_app`, publishes it, and then
performs an "agent app update" (sending the configuration) while
holding the session list lock. This means that there is a window where
an agent application is visible to the other threads, yet has not been
properly registered.

Solution
--------

The acquisition of the session list lock is moved outside of
update_agent_app() to allow the "agent thread" to hold the session
list lock during the "configuration update" phase of the agent
application registration.

Essentially, the sequence of operation changes from:

- Agent tcp connection established
- call handle_registration()
  - agent version check
  - allocation of agent_app instance
  - new agent_add is published through the global agent_apps_ht_by_sock
    hashtable
    ***
    it is now reachable by all other threads without any form of
    exclusivity synchronization.
    ***
- update_agent_app
  - acquire session list lock
  - iterate over sessions
    - send configuration
  - release session list lock
- send registration done

to:

- Agent tcp connection established
- call accept_agent_registration()
  - agent version check
- allocation of agent_app instance
- acquire session list lock
- update_agent_app
  - iterate over sessions
    - send configuration
- send registration done
- new agent_add is published through the global agent_apps_ht_by_sock
  hashtable
- release session list lock

Links
-----

[1] https://github.com/lttng/lttng-ust-java-tests

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: Ia34c5ad81ed3936acbca756b425423e0cb8dbddf

src/bin/lttng-sessiond/agent-thread.c
src/bin/lttng-sessiond/agent.c
src/bin/lttng-sessiond/agent.h
src/bin/lttng-sessiond/session.h

index c278ce95e506daabf779636b2544dc799181b38e..06ef377a3a181c1b7528df89b076340f789c0f70 100644 (file)
@@ -28,6 +28,15 @@ struct thread_notifiers {
        sem_t ready;
 };
 
+struct agent_app_id {
+       pid_t pid;
+       enum lttng_domain_type domain;
+};
+
+struct agent_protocol_version {
+       unsigned int major, minor;
+};
+
 static int agent_tracing_enabled = -1;
 
 /*
@@ -42,10 +51,11 @@ static const char *default_reg_uri =
  * Update agent application using the given socket. This is done just after
  * registration was successful.
  *
- * This is a quite heavy call in terms of locking since the session list lock
- * AND session lock are acquired.
+ * This will acquire the various sessions' lock; none must be held by the
+ * caller.
+ * The caller must hold the session list lock.
  */
-static void update_agent_app(struct agent_app *app)
+static void update_agent_app(const struct agent_app *app)
 {
        struct ltt_session *session, *stmp;
        struct ltt_session_list *list;
@@ -53,7 +63,6 @@ static void update_agent_app(struct agent_app *app)
        list = session_get_list();
        assert(list);
 
-       session_lock_list();
        cds_list_for_each_entry_safe(session, stmp, &list->head, list) {
                if (!session_get(session)) {
                        continue;
@@ -61,19 +70,18 @@ static void update_agent_app(struct agent_app *app)
 
                session_lock(session);
                if (session->ust_session) {
-                       struct agent *agt;
+                       const struct agent *agt;
 
                        rcu_read_lock();
                        agt = trace_ust_find_agent(session->ust_session, app->domain);
                        if (agt) {
-                               agent_update(agt, app->sock->fd);
+                               agent_update(agt, app);
                        }
                        rcu_read_unlock();
                }
                session_unlock(session);
                session_put(session);
        }
-       session_unlock_list();
 }
 
 /*
@@ -188,23 +196,56 @@ static void destroy_tcp_socket(struct lttcomm_sock *sock)
        lttcomm_destroy_sock(sock);
 }
 
+static const char *domain_type_str(enum lttng_domain_type domain_type)
+{
+       switch (domain_type) {
+       case LTTNG_DOMAIN_NONE:
+               return "none";
+       case LTTNG_DOMAIN_KERNEL:
+               return "kernel";
+       case LTTNG_DOMAIN_UST:
+               return "ust";
+       case LTTNG_DOMAIN_JUL:
+               return "jul";
+       case LTTNG_DOMAIN_LOG4J:
+               return "log4j";
+       case LTTNG_DOMAIN_PYTHON:
+               return "python";
+       default:
+               return "unknown";
+       }
+}
+
+static bool is_agent_protocol_version_supported(
+               const struct agent_protocol_version *version)
+{
+       const bool is_supported = version->major == AGENT_MAJOR_VERSION &&
+                       version->minor == AGENT_MINOR_VERSION;
+
+       if (!is_supported) {
+               WARN("Refusing agent connection: unsupported protocol version %ui.%ui, expected %i.%i",
+                               version->major, version->minor,
+                               AGENT_MAJOR_VERSION, AGENT_MINOR_VERSION);
+       }
+
+       return is_supported;
+}
+
 /*
- * Handle a new agent registration using the reg socket. After that, a new
- * agent application is added to the global hash table and attach to an UST app
- * object. If r_app is not NULL, the created app is set to the pointer.
+ * Handle a new agent connection on the registration socket.
  *
- * Return the new FD created upon accept() on success or else a negative errno
- * value.
+ * Returns 0 on success, or else a negative errno value.
+ * On success, the resulting socket is returned through `agent_app_socket`
+ * and the application's reported id is updated through `agent_app_id`.
  */
-static int handle_registration(struct lttcomm_sock *reg_sock,
-               struct agent_app **r_app)
+static int accept_agent_connection(
+               struct lttcomm_sock *reg_sock,
+               struct agent_app_id *agent_app_id,
+               struct lttcomm_sock **agent_app_socket)
 {
        int ret;
-       pid_t pid;
-       uint32_t major_version, minor_version;
+       struct agent_protocol_version agent_version;
        ssize_t size;
-       enum lttng_domain_type domain;
-       struct agent_app *app;
        struct agent_register_msg msg;
        struct lttcomm_sock *new_sock;
 
@@ -213,60 +254,52 @@ static int handle_registration(struct lttcomm_sock *reg_sock,
        new_sock = reg_sock->ops->accept(reg_sock);
        if (!new_sock) {
                ret = -ENOTCONN;
-               goto error;
+               goto end;
        }
 
        size = new_sock->ops->recvmsg(new_sock, &msg, sizeof(msg), 0);
        if (size < sizeof(msg)) {
+               if (size < 0) {
+                       PERROR("Failed to register new agent application");
+               } else if (size != 0) {
+                       ERR("Failed to register new agent application: invalid registration message length: expected length = %zu, message length = %zd",
+                                       sizeof(msg), size);
+               } else {
+                       DBG("Failed to register new agent application: connection closed");
+               }
                ret = -EINVAL;
-               goto error_socket;
-       }
-       domain = be32toh(msg.domain);
-       pid = be32toh(msg.pid);
-       major_version = be32toh(msg.major_version);
-       minor_version = be32toh(msg.minor_version);
-
-       /* Test communication protocol version of the registring agent. */
-       if (major_version != AGENT_MAJOR_VERSION) {
-               ret = -EINVAL;
-               goto error_socket;
-       }
-       if (minor_version != AGENT_MINOR_VERSION) {
-               ret = -EINVAL;
-               goto error_socket;
+               goto error_close_socket;
        }
 
-       DBG2("[agent-thread] New registration for pid %d domain %d on socket %d",
-                       pid, domain, new_sock->fd);
+       agent_version = (struct agent_protocol_version) {
+               be32toh(msg.major_version),
+               be32toh(msg.minor_version),
+       };
 
-       app = agent_create_app(pid, domain, new_sock);
-       if (!app) {
-               ret = -ENOMEM;
-               goto error_socket;
+       /* Test communication protocol version of the registering agent. */
+       if (!is_agent_protocol_version_supported(&agent_version)) {
+               ret = -EINVAL;
+               goto error_close_socket;
        }
 
-       /*
-        * Add before assigning the socket value to the UST app so it can be found
-        * concurrently.
-        */
-       agent_add_app(app);
-
-       /*
-        * We don't need to attach the agent app to the app. If we ever do so, we
-        * should consider both registration order of agent before app and app
-        * before agent.
-        */
+       *agent_app_id = (struct agent_app_id) {
+               .domain = (enum lttng_domain_type) be32toh(msg.domain),
+               .pid = (pid_t) be32toh(msg.pid),
+       };
 
-       if (r_app) {
-               *r_app = app;
-       }
+       DBG2("New registration for agent application: pid = %ld, domain = %s, socket fd = %d",
+                       (long) agent_app_id->pid,
+                       domain_type_str(agent_app_id->domain), new_sock->fd);
 
-       return new_sock->fd;
+       *agent_app_socket = new_sock;
+       new_sock = NULL;
+       ret = 0;
+       goto end;
 
-error_socket:
+error_close_socket:
        new_sock->ops->close(new_sock);
        lttcomm_destroy_sock(new_sock);
-error:
+end:
        return ret;
 }
 
@@ -362,7 +395,7 @@ static void *thread_agent_management(void *data)
        uatomic_set(&agent_tracing_enabled, 1);
        mark_thread_as_ready(notifiers);
 
-       /* Add TCP socket to poll set. */
+       /* Add TCP socket to the poll set. */
        ret = lttng_poll_add(&events, reg_sock->fd,
                        LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
        if (ret < 0) {
@@ -399,43 +432,79 @@ restart:
                                goto exit;
                        }
 
+                       /* Activity on the registration socket. */
                        if (revents & LPOLLIN) {
-                               int new_fd;
-                               struct agent_app *app = NULL;
+                               struct agent_app_id new_app_id;
+                               struct agent_app *new_app = NULL;
+                               struct lttcomm_sock *new_app_socket;
+                               int new_app_socket_fd;
 
                                assert(pollfd == reg_sock->fd);
-                               new_fd = handle_registration(reg_sock, &app);
-                               if (new_fd < 0) {
+
+                               ret = accept_agent_connection(
+                                       reg_sock, &new_app_id, &new_app_socket);
+                               if (ret < 0) {
+                                       /* Errors are already logged. */
                                        continue;
                                }
-                               /* Should not have a NULL app on success. */
-                               assert(app);
 
                                /*
-                                * Since this is a command socket (write then read),
-                                * only add poll error event to only detect shutdown.
+                                * new_app_socket's ownership has been
+                                * transferred to the new agent app.
                                 */
-                               ret = lttng_poll_add(&events, new_fd,
+                               new_app = agent_create_app(new_app_id.pid,
+                                               new_app_id.domain,
+                                               new_app_socket);
+                               if (!new_app) {
+                                       new_app_socket->ops->close(
+                                                       new_app_socket);
+                                       continue;
+                               }
+                               new_app_socket_fd = new_app_socket->fd;
+                               new_app_socket = NULL;
+
+                               /*
+                                * Since this is a command socket (write then
+                                * read), only add poll error event to only
+                                * detect shutdown.
+                                */
+                               ret = lttng_poll_add(&events, new_app_socket_fd,
                                                LPOLLERR | LPOLLHUP | LPOLLRDHUP);
                                if (ret < 0) {
-                                       agent_destroy_app_by_sock(new_fd);
+                                       agent_destroy_app(new_app);
                                        continue;
                                }
 
-                               /* Update newly registered app. */
-                               update_agent_app(app);
+                               /*
+                                * Prevent sessions from being modified while
+                                * the agent application's configuration is
+                                * updated.
+                                */
+                               session_lock_list();
+
+                               /*
+                                * Update the newly registered applications's
+                                * configuration.
+                                */
+                               update_agent_app(new_app);
 
-                               /* On failure, the poll will detect it and clean it up. */
-                               ret = agent_send_registration_done(app);
+                               ret = agent_send_registration_done(new_app);
                                if (ret < 0) {
-                                       /* Removing from the poll set */
-                                       ret = lttng_poll_del(&events, new_fd);
+                                       agent_destroy_app(new_app);
+                                       /* Removing from the poll set. */
+                                       ret = lttng_poll_del(&events,
+                                                       new_app_socket_fd);
                                        if (ret < 0) {
+                                               session_unlock_list();
                                                goto error;
                                        }
-                                       agent_destroy_app_by_sock(new_fd);
                                        continue;
                                }
+
+                               /* Publish the new agent app. */
+                               agent_add_app(new_app);
+
+                               session_unlock_list();
                        } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                /* Removing from the poll set */
                                ret = lttng_poll_del(&events, pollfd);
index 1926b0820fc808f4076e12b8110a4ce5fe34cbae..e5978be5cef3ac24ecc7b7255cff563f87b13580 100644 (file)
@@ -380,7 +380,7 @@ error:
  *
  * Return LTTNG_OK on success or else a LTTNG_ERR* code.
  */
-static int enable_event(struct agent_app *app, struct agent_event *event)
+static int enable_event(const struct agent_app *app, struct agent_event *event)
 {
        int ret;
        char *bytes_to_send;
@@ -495,8 +495,8 @@ end:
  *
  * Return LTTNG_OK on success or else a LTTNG_ERR* code.
  */
-static int app_context_op(struct agent_app *app,
-               struct agent_app_ctx *ctx, enum lttcomm_agent_command cmd)
+static int app_context_op(const struct agent_app *app,
+               const struct agent_app_ctx *ctx, enum lttcomm_agent_command cmd)
 {
        int ret;
        uint32_t reply_ret_code;
@@ -946,7 +946,7 @@ struct agent_app *agent_create_app(pid_t pid, enum lttng_domain_type domain,
 
        app = zmalloc(sizeof(*app));
        if (!app) {
-               PERROR("zmalloc agent create");
+               PERROR("Failed to allocate agent application instance");
                goto error;
        }
 
@@ -1402,26 +1402,24 @@ void agent_app_ht_clean(void)
  * Note that this function is most likely to be used with a tracing session
  * thus the caller should make sure to hold the appropriate lock(s).
  */
-void agent_update(struct agent *agt, int sock)
+void agent_update(const struct agent *agt, const struct agent_app *app)
 {
        int ret;
-       struct agent_app *app;
        struct agent_event *event;
        struct lttng_ht_iter iter;
        struct agent_app_ctx *ctx;
 
        assert(agt);
-       assert(sock >= 0);
+       assert(app);
 
-       DBG("Agent updating app socket %d", sock);
+       DBG("Agent updating app: pid = %ld", (long) app->pid);
 
        rcu_read_lock();
-       app = agent_find_app_by_sock(sock);
        /*
         * We are in the registration path thus if the application is gone,
         * there is a serious code flow error.
         */
-       assert(app);
+
        cds_lfht_for_each_entry(agt->events->ht, &iter.iter, event, node.node) {
                /* Skip event if disabled. */
                if (!event->enabled) {
index 2d2d6425559d2330904bd9d6c83eb24dbcba7872..f8e67efdae82b7130e951d721859841991fa2e76 100644 (file)
@@ -163,7 +163,7 @@ int agent_enable_event(struct agent_event *event,
                enum lttng_domain_type domain);
 int agent_disable_event(struct agent_event *event,
                enum lttng_domain_type domain);
-void agent_update(struct agent *agt, int sock);
+void agent_update(const struct agent *agt, const struct agent_app *app);
 int agent_list_events(struct lttng_event **events,
                enum lttng_domain_type domain);
 
index a58de18f1d632ac2a6d782cd31ddc017079b38b0..1df70a4747a8341d93b8b3185603ccddcfb23782 100644 (file)
@@ -195,13 +195,22 @@ struct ltt_session {
        char *base_path;
 };
 
-/* Prototypes */
 enum lttng_error_code session_create(const char *name, uid_t uid, gid_t gid,
                struct ltt_session **out_session);
 void session_lock(struct ltt_session *session);
+void session_unlock(struct ltt_session *session);
+
+/*
+ * The session list lock covers more ground than its name implies. While
+ * it does protect against concurent mutations of the session list, it is
+ * also used as a multi-session lock when synchronizing newly-registered
+ * 'user space tracer' and 'agent' applications.
+ *
+ * In other words, it prevents session configurations from changing while they
+ * are being transmitted to the various applications.
+ */
 void session_lock_list(void);
 int session_trylock_list(void);
-void session_unlock(struct ltt_session *session);
 void session_unlock_list(void);
 
 void session_destroy(struct ltt_session *session);
This page took 0.032973 seconds and 4 git commands to generate.