Explorar el Código

Merge pull request #6113 from HJianBo/fix-force-kill-after-kick-or-discard-timeout-5

Fix force kill after kick or discard timeout
JianBo He hace 4 años
padre
commit
c8f2aa13bb
Se han modificado 3 ficheros con 252 adiciones y 128 borrados
  1. 103 51
      apps/emqx/src/emqx_cm.erl
  2. 94 38
      apps/emqx/test/emqx_cm_SUITE.erl
  3. 55 39
      apps/emqx_management/src/emqx_mgmt_cli.erl

+ 103 - 51
apps/emqx/src/emqx_cm.erl

@@ -74,6 +74,7 @@
 
 %% Internal export
 -export([ stats_fun/0
+        , clean_down/1
         , mark_channel_connected/1
         , mark_channel_disconnected/1
         , get_connected_client_count/0
@@ -100,7 +101,9 @@
 %% Server name
 -define(CM, ?MODULE).
 
--define(T_TAKEOVER, 15000).
+-define(T_KICK, 5_000).
+-define(T_GET_INFO, 5_000).
+-define(T_TAKEOVER, 15_000).
 
 %% linting overrides
 -elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}}
@@ -176,7 +179,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
         error:badarg -> undefined
     end;
 get_chan_info(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]).
+    rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO).
 
 %% @doc Update infos of the channel.
 -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
@@ -201,7 +204,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
         error:badarg -> undefined
     end;
 get_chan_stats(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
+    rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO).
 
 %% @doc Set channel's stats.
 -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
@@ -336,77 +339,120 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
         undefined ->
             emqx_persistent_session:lookup(ClientId);
         ConnMod when is_atom(ConnMod) ->
+            %% TODO: if takeover times out, maybe kill the old?
             Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
             {living, ConnMod, ChanPid, Session}
     end;
-
 takeover_session(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]).
+    rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
 
 %% @doc Discard all the sessions identified by the ClientId.
 -spec(discard_session(emqx_types:clientid()) -> ok).
 discard_session(ClientId) when is_binary(ClientId) ->
     case lookup_channels(ClientId) of
         [] -> ok;
-        ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
+        ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
     end.
 
-do_discard_session(ClientId, Pid) ->
+%% @private Kick a local stale session to force it step down.
+%% If failed to kick (e.g. timeout) force a kill.
+%% Keeping the stale pid around, or returning error or raise an exception
+%% benefits nobody.
+-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
+kick_or_kill(Action, ConnMod, Pid) ->
     try
-        discard_session(ClientId, Pid)
+        %% this is essentailly a gen_server:call implemented in emqx_connection
+        %% and emqx_ws_connection.
+        %% the handle_call is implemented in emqx_channel
+        ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
     catch
         _ : noproc -> % emqx_ws_connection: call
-            ?tp(debug, "session_already_gone", #{pid => Pid}),
-            ok;
+            ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
         _ : {noproc, _} -> % emqx_connection: gen_server:call
-            ?tp(debug, "session_already_gone", #{pid => Pid}),
-            ok;
-        _ : {'EXIT', {noproc, _}} -> % rpc_call/3
-            ?tp(debug, "session_already_gone", #{pid => Pid}),
-            ok;
+            ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
+        _ : {shutdown, _} ->
+            ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
         _ : {{shutdown, _}, _} ->
-            ?tp(debug, "session_already_shutdown", #{pid => Pid}),
-            ok;
+            ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
+        _ : {timeout, {gen_server, call, _}} ->
+            ?tp(warning, "session_kick_timeout",
+                #{pid => Pid,
+                  action => Action,
+                  stale_channel => stale_channel_info(Pid)
+                 }),
+            ok = force_kill(Pid);
         _ : Error : St ->
-            ?tp(error, "failed_to_discard_session",
-                #{pid => Pid, reason => Error, stacktrace=>St})
+            ?tp(error, "session_kick_exception",
+                #{pid => Pid,
+                  action => Action,
+                  reason => Error,
+                  stacktrace => St,
+                  stale_channel => stale_channel_info(Pid)
+                 }),
+            ok = force_kill(Pid)
     end.
 
-discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
+force_kill(Pid) ->
+    exit(Pid, kill),
+    ok.
+
+stale_channel_info(Pid) ->
+    process_info(Pid, [status, message_queue_len, current_stacktrace]).
+
+discard_session(ClientId, ChanPid) ->
+    kick_session(discard, ClientId, ChanPid).
+
+kick_session(ClientId, ChanPid) ->
+    kick_session(kick, ClientId, ChanPid).
+
+%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
+kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
     case get_chann_conn_mod(ClientId, ChanPid) of
-        undefined -> ok;
+        undefined ->
+            %% already deregistered
+            ok;
         ConnMod when is_atom(ConnMod) ->
-            ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
+            ok = kick_or_kill(Action, ConnMod, ChanPid)
     end;
-
-discard_session(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]).
+kick_session(Action, ClientId, ChanPid) ->
+    %% call remote node on the old APIs because we do not know if they have upgraded
+    %% to have kick_session/3
+    Function = case Action of
+                   discard -> discard_session;
+                   kick -> kick_session
+               end,
+    try
+        rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK)
+    catch
+        Error : Reason ->
+            %% This should mostly be RPC failures.
+            %% However, if the node is still running the old version
+            %% code (prior to emqx app 4.3.10) some of the RPC handler
+            %% exceptions may get propagated to a new version node
+            ?SLOG(error, #{ msg => "failed_to_kick_session_on_remote_node"
+                          , node => node(ChanPid)
+                          , action => Action
+                          , error => Error
+                          , reason => Reason
+                          })
+    end.
 
 kick_session(ClientId) ->
     case lookup_channels(ClientId) of
-        [] -> {error, not_found};
-        [ChanPid] ->
-            kick_session(ClientId, ChanPid);
+        [] ->
+            ?SLOG(warning, #{msg => "kicked_an_unknown_session",
+                             clientid => ClientId}),
+            ok;
         ChanPids ->
-            [ChanPid | StalePids] = lists:reverse(ChanPids),
-            ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}),
-            lists:foreach(fun(StalePid) ->
-                                  catch discard_session(ClientId, StalePid)
-                          end, StalePids),
-            kick_session(ClientId, ChanPid)
+            case length(ChanPids) > 1 of
+                true ->
+                    ?SLOG(warning, #{msg => "more_than_one_channel_found",
+                                     chan_pids => ChanPids});
+                false -> ok
+            end,
+            lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
     end.
 
-kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
-    case get_chan_info(ClientId, ChanPid) of
-        #{conninfo := #{conn_mod := ConnMod}} ->
-            ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
-        undefined ->
-            {error, not_found}
-    end;
-
-kick_session(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]).
-
 %% @doc Is clean start?
 % is_clean_start(#{clean_start := false}) -> false;
 % is_clean_start(_Attrs) -> true.
@@ -448,10 +494,16 @@ lookup_channels(local, ClientId) ->
     [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
 
 %% @private
-rpc_call(Node, Fun, Args) ->
-    case rpc:call(Node, ?MODULE, Fun, Args) of
-        {badrpc, Reason} -> error(Reason);
-        Res -> Res
+rpc_call(Node, Fun, Args, Timeout) ->
+    case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
+        {badrpc, Reason} ->
+            %% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler
+            %% should catch all exceptions and always return 'ok'.
+            %% This leaves 'badrpc' only possible when there is problem
+            %% calling the remote node.
+            error({badrpc, Reason});
+        Res ->
+            Res
     end.
 
 %% @private
@@ -491,7 +543,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}
               mark_channel_disconnected(ChanPid)
       end,
       Items),
-    ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
+    ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
     {noreply, State#{chan_pmon := PMon1}};
 handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
@@ -527,7 +579,7 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
         error:badarg -> undefined
     end;
 get_chann_conn_mod(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).
+    rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
 
 mark_channel_connected(ChanPid) ->
     ?tp(emqx_cm_connected_client_count_inc, #{}),

+ 94 - 38
apps/emqx/test/emqx_cm_SUITE.erl

@@ -32,6 +32,12 @@
                      conn_mod => emqx_connection,
                      receive_maximum => 100}}).
 
+-define(WAIT(PATTERN, TIMEOUT, RET),
+        fun() ->
+                receive PATTERN -> RET
+                after TIMEOUT -> error({timeout, ?LINE}) end
+        end()).
+
 %%--------------------------------------------------------------------
 %% CT callbacks
 %%--------------------------------------------------------------------
@@ -179,29 +185,100 @@ t_open_session_race_condition(_) ->
 
     exit(Winner, kill),
     receive {'DOWN', _, process, Winner, _} -> ok end,
-    ignored = gen_server:call(emqx_cm, ignore, infinity), %% sync
+    ignored = gen_server:call(?CM, ignore, infinity), %% sync
     ok = emqx_pool:flush_async_tasks(),
     ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
 
-t_discard_session(_) ->
+t_kick_session_discard_normal(_) ->
+    test_kick_session(discard, normal).
+
+t_kick_session_discard_shutdown(_) ->
+    test_kick_session(discard, shutdown).
+
+t_kick_session_discard_shutdown_with_reason(_) ->
+    test_kick_session(discard, {shutdown, discard}).
+
+t_kick_session_discard_timeout(_) ->
+    test_kick_session(discard, timeout).
+
+t_kick_session_discard_noproc(_) ->
+    test_kick_session(discard, noproc).
+
+t_kick_session_kick_normal(_) ->
+    test_kick_session(discard, normal).
+
+t_kick_session_kick_shutdown(_) ->
+    test_kick_session(discard, shutdown).
+
+t_kick_session_kick_shutdown_with_reason(_) ->
+    test_kick_session(discard, {shutdown, discard}).
+
+t_kick_session_kick_timeout(_) ->
+    test_kick_session(discard, timeout).
+
+t_kick_session_kick_noproc(_) ->
+    test_kick_session(discard, noproc).
+
+test_kick_session(Action, Reason) ->
     ClientId = rand_client_id(),
     #{conninfo := ConnInfo} = ?ChanInfo,
-    ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
+    FakeSessionFun =
+        fun Loop() ->
+                     receive
+                         {'$gen_call', From, A} when A =:= kick orelse
+                                                     A =:= discard ->
+                             case Reason of
+                                 normal ->
+                                     gen_server:reply(From, ok);
+                                 timeout ->
+                                     %% no response to the call
+                                     Loop();
+                                 _ ->
+                                     exit(Reason)
+                             end;
+                         Msg ->
+                             ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]),
+                             Loop()
+                     end
+        end,
+    {Pid1, _} = spawn_monitor(FakeSessionFun),
+    {Pid2, _} = spawn_monitor(FakeSessionFun),
+    ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo),
+    ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo),
+    ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo),
+    ?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))),
+    case Reason of
+        noproc -> exit(Pid1, kill), exit(Pid2, kill);
+        _ -> ok
+    end,
+    ok = case Action of
+             kick -> emqx_cm:kick_session(ClientId);
+             discard -> emqx_cm:discard_session(ClientId)
+         end,
+    case Reason =:= timeout orelse Reason =:= noproc of
+        true ->
+            ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)),
+            ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R));
+        false ->
+            ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)),
+            ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R))
+    end,
+    ignored = gen_server:call(?CM, ignore, infinity), % sync
+    ok = flush_emqx_pool(),
+    ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
 
-    ok = meck:new(emqx_connection, [passthrough, no_history]),
-    ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
-    ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
-    ok = emqx_cm:discard_session(ClientId),
-    ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
-    ok = emqx_cm:discard_session(ClientId),
-    ok = emqx_cm:unregister_channel(ClientId),
-    ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
-    ok = emqx_cm:discard_session(ClientId),
-    ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
-    ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
-    ok = emqx_cm:discard_session(ClientId),
-    ok = emqx_cm:unregister_channel(ClientId),
-    ok = meck:unload(emqx_connection).
+%% Channel deregistration is delegated to emqx_pool as a sync tasks.
+%% The emqx_pool is pool of workers, and there is no way to know
+%% which worker was picked for the last deregistration task.
+%% This help function creates a large enough number of async tasks
+%% to sync with the pool workers.
+%% The number of tasks should be large enough to ensure all workers have
+%% the chance to work on at least one of the tasks.
+flush_emqx_pool() ->
+    Self = self(),
+    L = lists:seq(1, 1000),
+    lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L),
+    lists:foreach(fun(I) -> receive {done, I} -> ok end end, L).
 
 t_discard_session_race(_) ->
     ClientId = rand_client_id(),
@@ -234,27 +311,6 @@ t_takeover_session(_) ->
     {living, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
     emqx_cm:unregister_channel(<<"clientid">>).
 
-t_kick_session(_) ->
-    Info = #{conninfo := ConnInfo} = ?ChanInfo,
-    ok = meck:new(emqx_connection, [passthrough, no_history]),
-    ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
-    ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end),
-    {error, not_found} = emqx_cm:kick_session(<<"clientid">>),
-    ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
-    ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []),
-    test = emqx_cm:kick_session(<<"clientid">>),
-    erlang:spawn_link(
-        fun() ->
-            ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
-            ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []),
-
-            timer:sleep(1000)
-        end),
-    ct:sleep(100),
-    test = emqx_cm:kick_session(<<"clientid">>),
-    ok = emqx_cm:unregister_channel(<<"clientid">>),
-    ok = meck:unload(emqx_connection).
-
 t_all_channels(_) ->
     ?assertEqual(true, is_list(emqx_cm:all_channels())).
 

+ 55 - 39
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -79,10 +79,12 @@ broker([]) ->
     emqx_ctl:print("~-10s: ~p~n", [uptime, emqx_sys:uptime()]);
 
 broker(["stats"]) ->
-    [emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) || {Stat, Val} <- lists:sort(emqx_stats:getstats())];
+    [emqx_ctl:print("~-30s: ~w~n", [Stat, Val])
+     || {Stat, Val} <- lists:sort(emqx_stats:getstats())];
 
 broker(["metrics"]) ->
-    [emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) || {Metric, Val} <- lists:sort(emqx_metrics:all())];
+    [emqx_ctl:print("~-30s: ~w~n", [Metric, Val])
+     || {Metric, Val} <- lists:sort(emqx_metrics:all())];
 
 broker(_) ->
     emqx_ctl:usage([{"broker",         "Show broker version, uptime and description"},
@@ -142,10 +144,8 @@ clients(["show", ClientId]) ->
     if_client(ClientId, fun print/1);
 
 clients(["kick", ClientId]) ->
-    case emqx_cm:kick_session(bin(ClientId)) of
-        ok -> emqx_ctl:print("ok~n");
-        _ -> emqx_ctl:print("Not Found.~n")
-    end;
+    ok = emqx_cm:kick_session(bin(ClientId)),
+    emqx_ctl:print("ok~n");
 
 clients(_) ->
     emqx_ctl:usage([{"clients list",            "List all clients"},
@@ -209,10 +209,11 @@ subscriptions(["del", ClientId, Topic]) ->
     end;
 
 subscriptions(_) ->
-    emqx_ctl:usage([{"subscriptions list",                         "List all subscriptions"},
-                    {"subscriptions show <ClientId>",              "Show subscriptions of a client"},
-                    {"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
-                    {"subscriptions del <ClientId> <Topic>",       "Delete a static subscription manually"}]).
+    emqx_ctl:usage(
+      [{"subscriptions list",                         "List all subscriptions"},
+       {"subscriptions show <ClientId>",              "Show subscriptions of a client"},
+       {"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
+       {"subscriptions del <ClientId> <Topic>",       "Delete a static subscription manually"}]).
 
 if_valid_qos(QoS, Fun) ->
     try list_to_integer(QoS) of
@@ -281,14 +282,17 @@ vm(["memory"]) ->
     [emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
 
 vm(["process"]) ->
-    [emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) || {Name, Key} <- [{limit, process_limit}, {count, process_count}]];
+    [emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)])
+     || {Name, Key} <- [{limit, process_limit}, {count, process_count}]];
 
 vm(["io"]) ->
     IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))),
-    [emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)]) || Key <- [max_fds, active_fds]];
+    [emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)])
+     || Key <- [max_fds, active_fds]];
 
 vm(["ports"]) ->
-    [emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) || {Name, Key} <- [{count, port_count}, {limit, port_limit}]];
+    [emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)])
+     || {Name, Key} <- [{count, port_count}, {limit, port_limit}]];
 
 vm(_) ->
     emqx_ctl:usage([{"vm all",     "Show info of Erlang VM"},
@@ -325,8 +329,14 @@ log(["primary-level", Level]) ->
     emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]);
 
 log(["handlers", "list"]) ->
-    _ = [emqx_ctl:print("LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n", [Id, Level, Dst, Status])
-        || #{id := Id, level := Level, dst := Dst, status := Status} <- emqx_logger:get_log_handlers()],
+    _ = [emqx_ctl:print(
+            "LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n",
+            [Id, Level, Dst, Status]
+          )
+        || #{id := Id,
+             level := Level,
+             dst := Dst,
+             status := Status} <- emqx_logger:get_log_handlers()],
     ok;
 
 log(["handlers", "start", HandlerId]) ->
@@ -353,21 +363,25 @@ log(["handlers", "set-level", HandlerId, Level]) ->
     end;
 
 log(_) ->
-    emqx_ctl:usage([{"log set-level <Level>", "Set the overall log level"},
-                    {"log primary-level", "Show the primary log level now"},
-                    {"log primary-level <Level>","Set the primary log level"},
-                    {"log handlers list", "Show log handlers"},
-                    {"log handlers start <HandlerId>", "Start a log handler"},
-                    {"log handlers stop  <HandlerId>", "Stop a log handler"},
-                    {"log handlers set-level <HandlerId> <Level>", "Set log level of a log handler"}]).
+    emqx_ctl:usage(
+      [{"log set-level <Level>", "Set the overall log level"},
+       {"log primary-level", "Show the primary log level now"},
+       {"log primary-level <Level>","Set the primary log level"},
+       {"log handlers list", "Show log handlers"},
+       {"log handlers start <HandlerId>", "Start a log handler"},
+       {"log handlers stop  <HandlerId>", "Stop a log handler"},
+       {"log handlers set-level <HandlerId> <Level>", "Set log level of a log handler"}]).
 
 %%--------------------------------------------------------------------
 %% @doc Trace Command
 
 trace(["list"]) ->
     lists:foreach(fun({{Who, Name}, {Level, LogFile}}) ->
-                emqx_ctl:print("Trace(~ts=~ts, level=~ts, destination=~p)~n", [Who, Name, Level, LogFile])
-            end, emqx_tracer:lookup_traces());
+        emqx_ctl:print(
+            "Trace(~ts=~ts, level=~ts, destination=~p)~n",
+            [Who, Name, Level, LogFile]
+         )
+    end, emqx_tracer:lookup_traces());
 
 trace(["stop", "client", ClientId]) ->
     trace_off(clientid, ClientId);
@@ -491,10 +505,11 @@ authz(["cache-clean", ClientId]) ->
     emqx_mgmt:clean_authz_cache(ClientId);
 
 authz(_) ->
-    emqx_ctl:usage([{"authz cache-clean all",             "Clears authorization cache on all nodes"},
-                    {"authz cache-clean node <Node>",     "Clears authorization cache on given node"},
-                    {"authz cache-clean <ClientId>",      "Clears authorization cache for given client"}
-                   ]).
+    emqx_ctl:usage(
+      [{"authz cache-clean all",         "Clears authorization cache on all nodes"},
+       {"authz cache-clean node <Node>", "Clears authorization cache on given node"},
+       {"authz cache-clean <ClientId>",  "Clears authorization cache for given client"}
+      ]).
 
 
 %%--------------------------------------------------------------------
@@ -562,23 +577,24 @@ print({client, {ClientId, ChanPid}}) ->
                         maps:with([peername, clean_start, keepalive, expiry_interval,
                                    connected_at, disconnected_at], ConnInfo),
                         maps:with([created_at], Session)]),
-    InfoKeys = [clientid, username, peername,
-                clean_start, keepalive, expiry_interval,
-                subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped,
+    InfoKeys = [clientid, username, peername, clean_start, keepalive,
+                expiry_interval, subscriptions_cnt, inflight_cnt,
+                awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped,
                 connected, created_at, connected_at] ++
                 case maps:is_key(disconnected_at, Info) of
                     true  -> [disconnected_at];
                     false -> []
                 end,
     Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000},
-    emqx_ctl:print("Client(~ts, username=~ts, peername=~ts, "
-                    "clean_start=~ts, keepalive=~w, session_expiry_interval=~w, "
-                    "subscriptions=~w, inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, "
-                    "connected=~ts, created_at=~w, connected_at=~w" ++
-                    case maps:is_key(disconnected_at, Info1) of
-                        true  -> ", disconnected_at=~w)~n";
-                        false -> ")~n"
-                    end,
+    emqx_ctl:print(
+        "Client(~ts, username=~ts, peername=~ts, clean_start=~ts, "
+        "keepalive=~w, session_expiry_interval=~w, subscriptions=~w, "
+        "inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, "
+        "dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w"
+        ++ case maps:is_key(disconnected_at, Info1) of
+               true  -> ", disconnected_at=~w)~n";
+               false -> ")~n"
+           end,
         [format(K, maps:get(K, Info1)) || K <- InfoKeys]);
 
 print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) ->