|
|
@@ -72,7 +72,7 @@
|
|
|
]).
|
|
|
|
|
|
%% Internal export
|
|
|
--export([stats_fun/0]).
|
|
|
+-export([stats_fun/0, clean_down/1]).
|
|
|
|
|
|
-type(chan_pid() :: pid()).
|
|
|
|
|
|
@@ -93,7 +93,9 @@
|
|
|
%% Server name
|
|
|
-define(CM, ?MODULE).
|
|
|
|
|
|
--define(T_TAKEOVER, 15000).
|
|
|
+-define(T_KICK, 5_000).
|
|
|
+-define(T_GET_INFO, 5_000).
|
|
|
+-define(T_TAKEOVER, 15_000).
|
|
|
|
|
|
%% @doc Start the channel manager.
|
|
|
-spec(start_link() -> startlink_ret()).
|
|
|
@@ -164,7 +166,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
|
|
|
error:badarg -> undefined
|
|
|
end;
|
|
|
get_chan_info(ClientId, ChanPid) ->
|
|
|
- rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]).
|
|
|
+ rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO).
|
|
|
|
|
|
%% @doc Update infos of the channel.
|
|
|
-spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
|
|
|
@@ -189,7 +191,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
|
|
|
error:badarg -> undefined
|
|
|
end;
|
|
|
get_chan_stats(ClientId, ChanPid) ->
|
|
|
- rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
|
|
|
+ rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO).
|
|
|
|
|
|
%% @doc Set channel's stats.
|
|
|
-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
|
|
|
@@ -257,7 +259,7 @@ takeover_session(ClientId) ->
|
|
|
takeover_session(ClientId, ChanPid);
|
|
|
ChanPids ->
|
|
|
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
|
|
- ?LOG(error, "More than one channel found: ~p", [ChanPids]),
|
|
|
+ ?LOG(error, "more_than_one_channel_found: ~p", [ChanPids]),
|
|
|
lists:foreach(fun(StalePid) ->
|
|
|
catch discard_session(ClientId, StalePid)
|
|
|
end, StalePids),
|
|
|
@@ -269,77 +271,113 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
|
|
undefined ->
|
|
|
{error, not_found};
|
|
|
ConnMod when is_atom(ConnMod) ->
|
|
|
+ %% TODO: if takeover times out, maybe kill the old?
|
|
|
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
|
|
|
{ok, ConnMod, ChanPid, Session}
|
|
|
end;
|
|
|
-
|
|
|
takeover_session(ClientId, ChanPid) ->
|
|
|
- rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]).
|
|
|
+ rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
|
|
|
|
|
|
%% @doc Discard all the sessions identified by the ClientId.
|
|
|
-spec(discard_session(emqx_types:clientid()) -> ok).
|
|
|
discard_session(ClientId) when is_binary(ClientId) ->
|
|
|
case lookup_channels(ClientId) of
|
|
|
[] -> ok;
|
|
|
- ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
|
|
|
+ ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
|
|
|
end.
|
|
|
|
|
|
-do_discard_session(ClientId, Pid) ->
|
|
|
+%% @private Kick a local stale session to force it step down.
|
|
|
+%% If failed to kick (e.g. timeout) force a kill.
|
|
|
+%% Keeping the stale pid around, or returning error or raise an exception
|
|
|
+%% benefits nobody.
|
|
|
+-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
|
|
|
+kick_or_kill(Action, ConnMod, Pid) ->
|
|
|
try
|
|
|
- discard_session(ClientId, Pid)
|
|
|
+ %% this is essentailly a gen_server:call implemented in emqx_connection
|
|
|
+ %% and emqx_ws_connection.
|
|
|
+ %% the handle_call is implemented in emqx_channel
|
|
|
+ ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
|
|
|
catch
|
|
|
_ : noproc -> % emqx_ws_connection: call
|
|
|
- ?tp(debug, "session_already_gone", #{pid => Pid}),
|
|
|
- ok;
|
|
|
+ ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
|
|
|
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
|
|
- ?tp(debug, "session_already_gone", #{pid => Pid}),
|
|
|
- ok;
|
|
|
- _ : {'EXIT', {noproc, _}} -> % rpc_call/3
|
|
|
- ?tp(debug, "session_already_gone", #{pid => Pid}),
|
|
|
- ok;
|
|
|
+ ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
|
|
|
+ _ : {shutdown, _} ->
|
|
|
+ ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
|
|
|
_ : {{shutdown, _}, _} ->
|
|
|
- ?tp(debug, "session_already_shutdown", #{pid => Pid}),
|
|
|
- ok;
|
|
|
+ ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
|
|
|
+ _ : {timeout, {gen_server, call, _}} ->
|
|
|
+ ?tp(warning, "session_kick_timeout",
|
|
|
+ #{pid => Pid,
|
|
|
+ action => Action,
|
|
|
+ stale_channel => stale_channel_info(Pid)
|
|
|
+ }),
|
|
|
+ ok = force_kill(Pid);
|
|
|
_ : Error : St ->
|
|
|
- ?tp(error, "failed_to_discard_session",
|
|
|
- #{pid => Pid, reason => Error, stacktrace=>St})
|
|
|
+ ?tp(error, "session_kick_exception",
|
|
|
+ #{pid => Pid,
|
|
|
+ action => Action,
|
|
|
+ reason => Error,
|
|
|
+ stacktrace => St,
|
|
|
+ stale_channel => stale_channel_info(Pid)
|
|
|
+ }),
|
|
|
+ ok = force_kill(Pid)
|
|
|
end.
|
|
|
|
|
|
-discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
|
|
+force_kill(Pid) ->
|
|
|
+ exit(Pid, kill),
|
|
|
+ ok.
|
|
|
+
|
|
|
+stale_channel_info(Pid) ->
|
|
|
+ process_info(Pid, [status, message_queue_len, current_stacktrace]).
|
|
|
+
|
|
|
+discard_session(ClientId, ChanPid) ->
|
|
|
+ kick_session(discard, ClientId, ChanPid).
|
|
|
+
|
|
|
+kick_session(ClientId, ChanPid) ->
|
|
|
+ kick_session(kick, ClientId, ChanPid).
|
|
|
+
|
|
|
+%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
|
|
|
+kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
|
|
|
case get_chann_conn_mod(ClientId, ChanPid) of
|
|
|
- undefined -> ok;
|
|
|
+ undefined ->
|
|
|
+ %% already deregistered
|
|
|
+ ok;
|
|
|
ConnMod when is_atom(ConnMod) ->
|
|
|
- ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
|
|
|
+ ok = kick_or_kill(Action, ConnMod, ChanPid)
|
|
|
end;
|
|
|
-
|
|
|
-discard_session(ClientId, ChanPid) ->
|
|
|
- rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]).
|
|
|
+kick_session(Action, ClientId, ChanPid) ->
|
|
|
+ %% call remote node on the old APIs because we do not know if they have upgraded
|
|
|
+ %% to have kick_session/3
|
|
|
+ Function = case Action of
|
|
|
+ discard -> discard_session;
|
|
|
+ kick -> kick_session
|
|
|
+ end,
|
|
|
+ try
|
|
|
+ rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK)
|
|
|
+ catch
|
|
|
+ Error : Reason ->
|
|
|
+ %% This should mostly be RPC failures.
|
|
|
+ %% However, if the node is still running the old version
|
|
|
+ %% code (prior to emqx app 4.3.10) some of the RPC handler
|
|
|
+ %% exceptions may get propagated to a new version node
|
|
|
+ ?LOG(error, "failed_to_kick_session_on_remote_node ~p: ~p ~p ~p",
|
|
|
+ [node(ChanPid), Action, Error, Reason])
|
|
|
+ end.
|
|
|
|
|
|
kick_session(ClientId) ->
|
|
|
case lookup_channels(ClientId) of
|
|
|
- [] -> {error, not_found};
|
|
|
- [ChanPid] ->
|
|
|
- kick_session(ClientId, ChanPid);
|
|
|
+ [] ->
|
|
|
+ ?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]),
|
|
|
+ ok;
|
|
|
ChanPids ->
|
|
|
- [ChanPid|StalePids] = lists:reverse(ChanPids),
|
|
|
- ?LOG(error, "More than one channel found: ~p", [ChanPids]),
|
|
|
- lists:foreach(fun(StalePid) ->
|
|
|
- catch discard_session(ClientId, StalePid)
|
|
|
- end, StalePids),
|
|
|
- kick_session(ClientId, ChanPid)
|
|
|
+ case length(ChanPids) > 1 of
|
|
|
+ true -> ?LOG(info, "more_than_one_channel_found: ~p", [ChanPids]);
|
|
|
+ false -> ok
|
|
|
+ end,
|
|
|
+ lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
|
|
|
end.
|
|
|
|
|
|
-kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
|
|
- case get_chan_info(ClientId, ChanPid) of
|
|
|
- #{conninfo := #{conn_mod := ConnMod}} ->
|
|
|
- ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
|
|
|
- undefined ->
|
|
|
- {error, not_found}
|
|
|
- end;
|
|
|
-
|
|
|
-kick_session(ClientId, ChanPid) ->
|
|
|
- rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]).
|
|
|
-
|
|
|
%% @doc Is clean start?
|
|
|
% is_clean_start(#{clean_start := false}) -> false;
|
|
|
% is_clean_start(_Attrs) -> true.
|
|
|
@@ -375,10 +413,16 @@ lookup_channels(local, ClientId) ->
|
|
|
[ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
|
|
|
|
|
|
%% @private
|
|
|
-rpc_call(Node, Fun, Args) ->
|
|
|
- case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of
|
|
|
- {badrpc, Reason} -> error(Reason);
|
|
|
- Res -> Res
|
|
|
+rpc_call(Node, Fun, Args, Timeout) ->
|
|
|
+ case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
|
|
|
+ {badrpc, Reason} ->
|
|
|
+ %% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler
|
|
|
+ %% should catch all exceptions and always return 'ok'.
|
|
|
+ %% This leaves 'badrpc' only possible when there is problem
|
|
|
+ %% calling the remote node.
|
|
|
+ error({badrpc, Reason});
|
|
|
+ Res ->
|
|
|
+ Res
|
|
|
end.
|
|
|
|
|
|
%% @private
|
|
|
@@ -411,7 +455,7 @@ handle_cast(Msg, State) ->
|
|
|
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
|
|
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
|
|
|
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
|
|
- ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
|
|
|
+ ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
|
|
|
{noreply, State#{chan_pmon := PMon1}};
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
|
@@ -447,5 +491,5 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
|
|
|
error:badarg -> undefined
|
|
|
end;
|
|
|
get_chann_conn_mod(ClientId, ChanPid) ->
|
|
|
- rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).
|
|
|
+ rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
|
|
|
|