|
|
@@ -145,6 +145,12 @@
|
|
|
(is_binary(CLIENTID) orelse (is_atom(CLIENTID) andalso CLIENTID =/= undefined))
|
|
|
).
|
|
|
|
|
|
+-define(REQ_DOWN_ERR(MOD, PID, ACTION), (#{conn_mod => MOD, stale_pid => PID, action => ACTION})).
|
|
|
+
|
|
|
+-define(REQ_DOWN_ERR_CHANNEL_INFO(MOD, PID, ACTION),
|
|
|
+ (?REQ_DOWN_ERR(MOD, PID, ACTION)#{stale_channel => stale_channel_info(PID)})
|
|
|
+).
|
|
|
+
|
|
|
%% linting overrides
|
|
|
-elvis([
|
|
|
{elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}},
|
|
|
@@ -402,12 +408,8 @@ takeover_session(ClientId, Pid) ->
|
|
|
try
|
|
|
do_takeover_begin(ClientId, Pid)
|
|
|
catch
|
|
|
- _:R when
|
|
|
- R == noproc;
|
|
|
- R == timeout;
|
|
|
- %% request_stepdown/3
|
|
|
- R == unexpected_exception
|
|
|
- ->
|
|
|
+ %% request_stepdown/3
|
|
|
+ _:R when R == noproc; R == timeout; R == unexpected_exception ->
|
|
|
none;
|
|
|
% rpc_call/3
|
|
|
_:{'EXIT', {noproc, _}} ->
|
|
|
@@ -461,55 +463,64 @@ request_stepdown(Action, ConnMod, Pid) ->
|
|
|
_ -> ?T_TAKEOVER
|
|
|
end,
|
|
|
Return =
|
|
|
- %% this is essentially a gen_server:call implemented in emqx_connection
|
|
|
- %% and emqx_ws_connection.
|
|
|
- %% the handle_call is implemented in emqx_channel
|
|
|
try apply(ConnMod, call, [Pid, Action, Timeout]) of
|
|
|
ok -> ok;
|
|
|
Reply -> {ok, Reply}
|
|
|
catch
|
|
|
- % emqx_ws_connection: call
|
|
|
- _:noproc ->
|
|
|
- ok = ?tp(debug, "session_already_gone", #{stale_pid => Pid, action => Action}),
|
|
|
- {error, noproc};
|
|
|
- % emqx_connection: gen_server:call
|
|
|
- _:{noproc, _} ->
|
|
|
- ok = ?tp(debug, "session_already_gone", #{stale_pid => Pid, action => Action}),
|
|
|
- {error, noproc};
|
|
|
- _:{shutdown, _} ->
|
|
|
- ok = ?tp(debug, "session_already_shutdown", #{stale_pid => Pid, action => Action}),
|
|
|
- {error, noproc};
|
|
|
- _:{{shutdown, _}, _} ->
|
|
|
- ok = ?tp(debug, "session_already_shutdown", #{stale_pid => Pid, action => Action}),
|
|
|
- {error, noproc};
|
|
|
- _:{timeout, {gen_server, call, _}} ->
|
|
|
- ?tp(
|
|
|
- warning,
|
|
|
- "session_stepdown_request_timeout",
|
|
|
- #{stale_pid => Pid, action => Action, stale_channel => stale_channel_info(Pid)}
|
|
|
- ),
|
|
|
- ok = force_kill(Pid),
|
|
|
- {error, timeout};
|
|
|
- _:Error:St ->
|
|
|
- ?tp(
|
|
|
- error,
|
|
|
- "session_stepdown_request_exception",
|
|
|
- #{
|
|
|
- stale_pid => Pid,
|
|
|
- action => Action,
|
|
|
- reason => Error,
|
|
|
- stacktrace => St,
|
|
|
- stale_channel => stale_channel_info(Pid)
|
|
|
- }
|
|
|
- ),
|
|
|
- ok = force_kill(Pid),
|
|
|
- {error, unexpected_exception}
|
|
|
+ Err:Reason:St ->
|
|
|
+ handle_stepdown_exception(Err, Reason, St, ConnMod, Pid, Action)
|
|
|
end,
|
|
|
case Action == kick orelse Action == discard of
|
|
|
true -> ok;
|
|
|
_ -> Return
|
|
|
end.
|
|
|
|
|
|
+%% The emqx_connection returns `{Reason, {gen_server, call, _}}` on failure, but
|
|
|
+%% emqx_ws_connection returns `Reason`.
|
|
|
+handle_stepdown_exception(_Err, noproc, _St, ConnMod, Pid, Action) ->
|
|
|
+ ok = ?tp(debug, "ws_session_already_gone", ?REQ_DOWN_ERR(ConnMod, Pid, Action)),
|
|
|
+ {error, noproc};
|
|
|
+handle_stepdown_exception(_Err, {noproc, _}, _St, ConnMod, Pid, Action) ->
|
|
|
+ ok = ?tp(debug, "session_already_gone", ?REQ_DOWN_ERR(ConnMod, Pid, Action)),
|
|
|
+ {error, noproc};
|
|
|
+handle_stepdown_exception(_Err, {shutdown, _}, _St, ConnMod, Pid, Action) ->
|
|
|
+ ok = ?tp(debug, "ws_session_already_shutdown", ?REQ_DOWN_ERR(ConnMod, Pid, Action)),
|
|
|
+ {error, noproc};
|
|
|
+handle_stepdown_exception(_Err, {{shutdown, _}, _}, _St, ConnMod, Pid, Action) ->
|
|
|
+ ok = ?tp(debug, "session_already_shutdown", ?REQ_DOWN_ERR(ConnMod, Pid, Action)),
|
|
|
+ {error, noproc};
|
|
|
+handle_stepdown_exception(_Err, killed, _St, ConnMod, Pid, Action) ->
|
|
|
+ ?tp(debug, "ws_session_already_killed", ?REQ_DOWN_ERR(ConnMod, Pid, Action)),
|
|
|
+ {error, noproc};
|
|
|
+handle_stepdown_exception(_Err, {killed, {gen_server, call, _}}, _St, ConnMod, Pid, Action) ->
|
|
|
+ ?tp(debug, "session_already_killed", ?REQ_DOWN_ERR(ConnMod, Pid, Action)),
|
|
|
+ {error, noproc};
|
|
|
+handle_stepdown_exception(_Err, normal, _St, ConnMod, Pid, Action) ->
|
|
|
+ ?tp(debug, "ws_session_already_stopped_normally", ?REQ_DOWN_ERR(ConnMod, Pid, Action)),
|
|
|
+ {error, noproc};
|
|
|
+handle_stepdown_exception(_Err, {normal, {gen_server, call, _}}, _St, ConnMod, Pid, Action) ->
|
|
|
+ ?tp(debug, "session_already_stopped_normally", ?REQ_DOWN_ERR(ConnMod, Pid, Action)),
|
|
|
+ {error, noproc};
|
|
|
+handle_stepdown_exception(_Err, timeout, _St, ConnMod, Pid, Action) ->
|
|
|
+ ErrInfo = ?REQ_DOWN_ERR_CHANNEL_INFO(ConnMod, Pid, Action),
|
|
|
+ ?tp(warning, "ws_session_stepdown_request_timeout", ErrInfo),
|
|
|
+ ok = force_kill(Pid),
|
|
|
+ {error, timeout};
|
|
|
+handle_stepdown_exception(_Err, {timeout, {gen_server, call, _}}, _St, ConnMod, Pid, Action) ->
|
|
|
+ ErrInfo = ?REQ_DOWN_ERR_CHANNEL_INFO(ConnMod, Pid, Action),
|
|
|
+ ?tp(warning, "session_stepdown_request_timeout", ErrInfo),
|
|
|
+ ok = force_kill(Pid),
|
|
|
+ {error, timeout};
|
|
|
+handle_stepdown_exception(Err, Reason, St, ConnMod, Pid, Action) ->
|
|
|
+ ErroInfo = ?REQ_DOWN_ERR_CHANNEL_INFO(ConnMod, Pid, Action)#{
|
|
|
+ error => Err,
|
|
|
+ reason => Reason,
|
|
|
+ stacktrace => St
|
|
|
+ },
|
|
|
+ ?tp(error, "session_stepdown_request_exception", ErroInfo),
|
|
|
+ ok = force_kill(Pid),
|
|
|
+ {error, unexpected_exception}.
|
|
|
+
|
|
|
force_kill(Pid) ->
|
|
|
exit(Pid, kill),
|
|
|
ok.
|