|
|
@@ -74,12 +74,6 @@
|
|
|
|
|
|
-type replies() :: emqx_types:packet() | reply() | [reply()].
|
|
|
|
|
|
--define(TIMER_TABLE, #{
|
|
|
- alive_timer => keepalive,
|
|
|
- force_timer => force_close,
|
|
|
- idle_timer => force_close_idle
|
|
|
-}).
|
|
|
-
|
|
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -224,7 +218,7 @@ address({Host, Port}) ->
|
|
|
|
|
|
%% avoid udp connection process leak
|
|
|
start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) ->
|
|
|
- ensure_timer(idle_timer, Channel);
|
|
|
+ ensure_timer(force_close_idle, Channel);
|
|
|
start_idle_checking_timer(Channel) ->
|
|
|
Channel.
|
|
|
|
|
|
@@ -293,10 +287,10 @@ handle_timeout(
|
|
|
case emqx_keepalive:check(StatVal, Keepalive) of
|
|
|
{ok, NKeepalive} ->
|
|
|
NChannel = Channel#channel{keepalive = NKeepalive},
|
|
|
- {ok, reset_timer(alive_timer, NChannel)};
|
|
|
+ {ok, reset_timer(keepalive, NChannel)};
|
|
|
{error, timeout} ->
|
|
|
Req = #{type => 'KEEPALIVE'},
|
|
|
- NChannel = remove_timer_ref(alive_timer, Channel),
|
|
|
+ NChannel = remove_timer_ref(keepalive, Channel),
|
|
|
%% close connection if keepalive timeout
|
|
|
Replies = [{event, disconnected}, {close, keepalive_timeout}],
|
|
|
NChannel1 = dispatch(on_timer_timeout, Req, NChannel#channel{
|
|
|
@@ -419,7 +413,7 @@ handle_call(
|
|
|
NConnInfo = ConnInfo#{keepalive => Interval},
|
|
|
NClientInfo = ClientInfo#{keepalive => Interval},
|
|
|
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
|
|
|
- {reply, ok, [{event, updated}], ensure_keepalive(cancel_timer(idle_timer, NChannel))};
|
|
|
+ {reply, ok, [{event, updated}], ensure_keepalive(cancel_timer(force_close_idle, NChannel))};
|
|
|
handle_call(
|
|
|
{subscribe_from_client, TopicFilter, Qos},
|
|
|
_From,
|
|
|
@@ -529,7 +523,7 @@ handle_info(
|
|
|
_ ->
|
|
|
Channel
|
|
|
end,
|
|
|
- Channel2 = ensure_timer(force_timer, Channel1),
|
|
|
+ Channel2 = ensure_timer(force_close, Channel1),
|
|
|
{ok, ensure_disconnected(Reason, Channel2)}
|
|
|
end;
|
|
|
handle_info(
|
|
|
@@ -547,13 +541,13 @@ handle_info(
|
|
|
|
|
|
ShutdownNow =
|
|
|
emqx_exproto_gcli:is_empty(GClient) andalso
|
|
|
- maps:get(force_timer, Timers, undefined) =/= undefined,
|
|
|
+ maps:get(force_close, Timers, undefined) =/= undefined,
|
|
|
case Result of
|
|
|
ok when not ShutdownNow ->
|
|
|
GClient1 = emqx_exproto_gcli:maybe_shoot(GClient),
|
|
|
{ok, Channel#channel{gcli = GClient1}};
|
|
|
ok when ShutdownNow ->
|
|
|
- Channel1 = cancel_timer(force_timer, Channel),
|
|
|
+ Channel1 = cancel_timer(force_close, Channel),
|
|
|
{shutdown, Channel1#channel.closed_reason, Channel1};
|
|
|
{error, Reason} ->
|
|
|
{shutdown, {error, {FunName, Reason}}, Channel}
|
|
|
@@ -711,7 +705,7 @@ ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
|
|
|
ensure_keepalive_timer(Interval, Channel) ->
|
|
|
StatVal = emqx_gateway_conn:keepalive_stats(recv),
|
|
|
Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)),
|
|
|
- ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
|
|
|
+ ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
|
|
|
|
|
|
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
|
|
|
TRef = maps:get(Name, Timers, undefined),
|
|
|
@@ -723,8 +717,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) ->
|
|
|
end.
|
|
|
|
|
|
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
|
|
|
- Msg = maps:get(Name, ?TIMER_TABLE),
|
|
|
- TRef = emqx_utils:start_timer(Time, Msg),
|
|
|
+ TRef = emqx_utils:start_timer(Time, Name),
|
|
|
Channel#channel{timers = Timers#{Name => TRef}}.
|
|
|
|
|
|
reset_timer(Name, Channel) ->
|
|
|
@@ -737,11 +730,11 @@ cancel_timer(Name, Channel = #channel{timers = Timers}) ->
|
|
|
remove_timer_ref(Name, Channel = #channel{timers = Timers}) ->
|
|
|
Channel#channel{timers = maps:remove(Name, Timers)}.
|
|
|
|
|
|
-interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
|
|
|
+interval(force_close_idle, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
|
|
|
IdleTimeout;
|
|
|
-interval(force_timer, _) ->
|
|
|
+interval(force_close, _) ->
|
|
|
15000;
|
|
|
-interval(alive_timer, #channel{keepalive = Keepalive}) ->
|
|
|
+interval(keepalive, #channel{keepalive = Keepalive}) ->
|
|
|
emqx_keepalive:info(interval, Keepalive).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|