Просмотр исходного кода

feat(session): add custom session timers mechanism

That are managed exclusively by the session implementation, unlike
common session timers that are managed by the channel itself.
Andrew Mayorov 2 лет назад
Родитель
Сommit
b1f144ab8b

+ 11 - 0
apps/emqx/src/emqx_channel.erl

@@ -1338,6 +1338,17 @@ handle_timeout(
             NChannel = Channel#channel{session = NSession},
             handle_out(publish, Publishes, reset_timer(TimerName, Timeout, NChannel))
     end;
+handle_timeout(
+    _TRef,
+    {emqx_session, TimerName},
+    Channel = #channel{session = Session, clientinfo = ClientInfo}
+) ->
+    case emqx_session:handle_timeout(ClientInfo, TimerName, Session) of
+        {ok, [], NSession} ->
+            {ok, Channel#channel{session = NSession}};
+        {ok, Replies, NSession} ->
+            handle_out(publish, Replies, Channel#channel{session = NSession})
+    end;
 handle_timeout(_TRef, expire_session, Channel) ->
     shutdown(expired, Channel);
 handle_timeout(

+ 41 - 2
apps/emqx/src/emqx_session.erl

@@ -88,6 +88,13 @@
     terminate/3
 ]).
 
+% Timers
+-export([
+    ensure_timer/3,
+    reset_timer/3,
+    cancel_timer/2
+]).
+
 % Foreign session implementations
 -export([enrich_delivers/3]).
 
@@ -103,7 +110,9 @@
     conninfo/0,
     reply/0,
     replies/0,
-    common_timer_name/0
+    common_timer_name/0,
+    custom_timer_name/0,
+    timerset/0
 ]).
 
 -type session_id() :: _TODO.
@@ -118,6 +127,7 @@
     }.
 
 -type common_timer_name() :: retry_delivery | expire_awaiting_rel.
+-type custom_timer_name() :: atom().
 
 -type message() :: emqx_types:message().
 -type publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}.
@@ -144,6 +154,8 @@
     emqx_session_mem:session()
     | emqx_persistent_session_ds:session().
 
+-type timerset() :: #{custom_timer_name() => _TimerRef :: reference()}.
+
 -define(INFO_KEYS, [
     id,
     created_at,
@@ -442,14 +454,41 @@ enrich_subopts(_Opt, _V, Msg, _) ->
 %% Timeouts
 %%--------------------------------------------------------------------
 
--spec handle_timeout(clientinfo(), common_timer_name(), t()) ->
+-spec handle_timeout(clientinfo(), common_timer_name() | custom_timer_name(), t()) ->
     {ok, replies(), t()}
+    %% NOTE: only relevant for `common_timer_name()`
     | {ok, replies(), timeout(), t()}.
 handle_timeout(ClientInfo, Timer, Session) ->
     ?IMPL(Session):handle_timeout(ClientInfo, Timer, Session).
 
 %%--------------------------------------------------------------------
 
+-spec ensure_timer(custom_timer_name(), timeout(), timerset()) ->
+    timerset().
+ensure_timer(Name, _Time, Timers = #{}) when is_map_key(Name, Timers) ->
+    Timers;
+ensure_timer(Name, Time, Timers = #{}) when Time > 0 ->
+    TRef = emqx_utils:start_timer(Time, {?MODULE, Name}),
+    Timers#{Name => TRef}.
+
+-spec reset_timer(custom_timer_name(), timeout(), timerset()) ->
+    timerset().
+reset_timer(Name, Time, Channel) ->
+    ensure_timer(Name, Time, cancel_timer(Name, Channel)).
+
+-spec cancel_timer(custom_timer_name(), timerset()) ->
+    timerset().
+cancel_timer(Name, Timers) ->
+    case maps:take(Name, Timers) of
+        {TRef, NTimers} ->
+            ok = emqx_utils:cancel_timer(TRef),
+            NTimers;
+        error ->
+            Timers
+    end.
+
+%%--------------------------------------------------------------------
+
 -spec disconnect(clientinfo(), t()) ->
     {idle | shutdown, t()}.
 disconnect(_ClientInfo, Session) ->

+ 3 - 0
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl

@@ -2116,6 +2116,9 @@ handle_timeout(_TRef, expire_session, Channel) ->
 handle_timeout(_TRef, expire_asleep, Channel) ->
     shutdown(asleep_timeout, Channel);
 handle_timeout(_TRef, Msg, Channel) ->
+    %% NOTE
+    %% We do not expect `emqx_mqttsn_session` to set up any custom timers (i.e with
+    %% `emqx_session:ensure_timer/3`), because `emqx_session_mem` doesn't use any.
     ?SLOG(error, #{
         msg => "unexpected_timeout",
         timeout_msg => Msg