Explorar o código

refactor(session): bring back common session timers

Andrew Mayorov %!s(int64=2) %!d(string=hai) anos
pai
achega
97881ff3ca

+ 1 - 4
apps/emqx/include/emqx_session_mem.hrl

@@ -49,10 +49,7 @@
     %% Awaiting PUBREL Timeout (Unit: millisecond)
     await_rel_timeout :: timeout(),
     %% Created at
-    created_at :: pos_integer(),
-
-    %% Timers
-    timers :: #{_Name => reference()}
+    created_at :: pos_integer()
 }).
 
 -endif.

+ 38 - 9
apps/emqx/src/emqx_channel.erl

@@ -130,6 +130,10 @@
 
 -define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
 
+-define(IS_COMMON_SESSION_TIMER(N),
+    ((N == retry_delivery) orelse (N == expire_awaiting_rel))
+).
+
 -define(LIMITER_ROUTING, message_routing).
 
 -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
@@ -723,8 +727,9 @@ do_publish(
         {ok, PubRes, NSession} ->
             RC = pubrec_reason_code(PubRes),
             NChannel0 = Channel#channel{session = NSession},
-            NChannel1 = ensure_quota(PubRes, NChannel0),
-            handle_out(pubrec, {PacketId, RC}, NChannel1);
+            NChannel1 = ensure_timer(expire_awaiting_rel, NChannel0),
+            NChannel2 = ensure_quota(PubRes, NChannel1),
+            handle_out(pubrec, {PacketId, RC}, NChannel2);
         {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
             ok = emqx_metrics:inc('packets.publish.inuse'),
             handle_out(pubrec, {PacketId, RC}, Channel);
@@ -953,7 +958,7 @@ handle_deliver(
             {ok, Channel#channel{session = NSession}};
         {ok, Publishes, NSession} ->
             NChannel = Channel#channel{session = NSession},
-            handle_out(publish, Publishes, NChannel)
+            handle_out(publish, Publishes, ensure_timer(retry_delivery, NChannel))
     end.
 
 %% Nack delivers from shared subscription
@@ -1067,6 +1072,10 @@ return_connack(AckPacket, Channel) ->
             },
             {Packets, NChannel2} = do_deliver(Publishes, NChannel1),
             Outgoing = [?REPLY_OUTGOING(Packets) || length(Packets) > 0],
+            % NOTE
+            % Session timers are not restored here, so there's a tiny chance that
+            % the session becomes stuck, when it already has no place to track new
+            % messages.
             {ok, Replies ++ Outgoing, NChannel2}
     end.
 
@@ -1307,14 +1316,27 @@ handle_timeout(
     end;
 handle_timeout(
     _TRef,
-    {emqx_session, Name},
+    Name,
+    Channel = #channel{conn_state = disconnected}
+) when ?IS_COMMON_SESSION_TIMER(Name) ->
+    {ok, Channel};
+handle_timeout(
+    _TRef,
+    Name,
     Channel = #channel{session = Session, clientinfo = ClientInfo}
-) ->
+) when ?IS_COMMON_SESSION_TIMER(Name) ->
+    % NOTE
+    % Responsibility for these timers is smeared across both this module and the
+    % `emqx_session` module: the latter holds configured timer intervals, and is
+    % responsible for the actual timeout logic. Yet they are managed here, since
+    % they are kind of common to all session implementations.
     case emqx_session:handle_timeout(ClientInfo, Name, Session) of
-        {ok, [], NSession} ->
-            {ok, Channel#channel{session = NSession}};
-        {ok, Replies, NSession} ->
-            handle_out(publish, Replies, Channel#channel{session = NSession})
+        {ok, Publishes, NSession} ->
+            NChannel = Channel#channel{session = NSession},
+            handle_out(publish, Publishes, clean_timer(Name, NChannel));
+        {ok, Publishes, Timeout, NSession} ->
+            NChannel = Channel#channel{session = NSession},
+            handle_out(publish, Publishes, reset_timer(Name, Timeout, NChannel))
     end;
 handle_timeout(_TRef, expire_session, Channel) ->
     shutdown(expired, Channel);
@@ -1369,11 +1391,18 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
 reset_timer(Name, Channel) ->
     ensure_timer(Name, clean_timer(Name, Channel)).
 
+reset_timer(Name, Time, Channel) ->
+    ensure_timer(Name, Time, clean_timer(Name, Channel)).
+
 clean_timer(Name, Channel = #channel{timers = Timers}) ->
     Channel#channel{timers = maps:remove(Name, Timers)}.
 
 interval(keepalive, #channel{keepalive = KeepAlive}) ->
     emqx_keepalive:info(interval, KeepAlive);
+interval(retry_delivery, #channel{session = Session}) ->
+    emqx_session:info(retry_interval, Session);
+interval(expire_awaiting_rel, #channel{session = Session}) ->
+    emqx_session:info(await_rel_timeout, Session);
 interval(expire_session, #channel{conninfo = ConnInfo}) ->
     maps:get(expiry_interval, ConnInfo);
 interval(will_message, #channel{will_msg = WillMsg}) ->

+ 7 - 30
apps/emqx/src/emqx_session.erl

@@ -95,13 +95,6 @@
 % Foreign session implementations
 -export([enrich_delivers/3]).
 
-% Timers
--export([
-    ensure_timer/3,
-    reset_timer/3,
-    cancel_timer/2
-]).
-
 % Utilities
 -export([should_discard/1]).
 
@@ -113,7 +106,8 @@
     conf/0,
     conninfo/0,
     reply/0,
-    replies/0
+    replies/0,
+    common_timer_name/0
 ]).
 
 -type session_id() :: _TODO.
@@ -127,6 +121,8 @@
         expiry_interval => non_neg_integer()
     }.
 
+-type common_timer_name() :: retry_delivery | expire_awaiting_rel.
+
 -type message() :: emqx_types:message().
 -type publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}.
 -type pubrel() :: {pubrel, emqx_types:packet_id()}.
@@ -415,33 +411,14 @@ enrich_subopts(_Opt, _V, Msg, _) ->
 %% Timeouts
 %%--------------------------------------------------------------------
 
--spec handle_timeout(clientinfo(), atom(), t()) ->
-    {ok, t()} | {ok, replies(), t()}.
+-spec handle_timeout(clientinfo(), common_timer_name(), t()) ->
+    {ok, replies(), t()}
+    | {ok, replies(), timeout(), t()}.
 handle_timeout(ClientInfo, Timer, Session) ->
     ?IMPL(Session):handle_timeout(ClientInfo, Timer, Session).
 
 %%--------------------------------------------------------------------
 
-ensure_timer(Name, _Time, Timers = #{}) when is_map_key(Name, Timers) ->
-    Timers;
-ensure_timer(Name, Time, Timers = #{}) when Time > 0 ->
-    TRef = emqx_utils:start_timer(Time, {?MODULE, Name}),
-    Timers#{Name => TRef}.
-
-reset_timer(Name, Time, Channel) ->
-    ensure_timer(Name, Time, cancel_timer(Name, Channel)).
-
-cancel_timer(Name, Timers) ->
-    case maps:take(Name, Timers) of
-        {TRef, NTimers} ->
-            ok = emqx_utils:cancel_timer(TRef),
-            NTimers;
-        error ->
-            Timers
-    end.
-
-%%--------------------------------------------------------------------
-
 -spec disconnect(clientinfo(), t()) ->
     {idle | shutdown, t()}.
 disconnect(_ClientInfo, Session) ->

+ 24 - 57
apps/emqx/src/emqx_session_mem.erl

@@ -164,7 +164,6 @@ create(#{zone := Zone, clientid := ClientId}, #{expiry_interval := EI}, Conf) ->
         mqueue = emqx_mqueue:init(QueueOpts),
         next_pkt_id = 1,
         awaiting_rel = #{},
-        timers = #{},
         max_subscriptions = maps:get(max_subscriptions, Conf),
         max_awaiting_rel = maps:get(max_awaiting_rel, Conf),
         upgrade_qos = maps:get(upgrade_qos, Conf),
@@ -339,7 +338,7 @@ get_subscription(Topic, #session{subscriptions = Subs}) ->
 publish(
     PacketId,
     Msg = #message{qos = ?QOS_2, timestamp = Ts},
-    Session = #session{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}
+    Session = #session{awaiting_rel = AwaitingRel}
 ) ->
     case is_awaiting_full(Session) of
         false ->
@@ -347,8 +346,7 @@ publish(
                 false ->
                     Results = emqx_broker:publish(Msg),
                     AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
-                    Session1 = ensure_timer(expire_awaiting_rel, Timeout, Session),
-                    {ok, Results, Session1#session{awaiting_rel = AwaitingRel1}};
+                    {ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
                 true ->
                     {error, ?RC_PACKET_IDENTIFIER_IN_USE}
             end;
@@ -417,7 +415,7 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
     case maps:take(PacketId, AwaitingRel) of
         {_Ts, AwaitingRel1} ->
             NSession = Session#session{awaiting_rel = AwaitingRel1},
-            {ok, reconcile_expire_timer(NSession)};
+            {ok, NSession};
         error ->
             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
     end.
@@ -449,7 +447,7 @@ pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
 dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
     case emqx_mqueue:is_empty(Q) of
         true ->
-            {ok, [], reconcile_retry_timer(Session)};
+            {ok, [], Session};
         false ->
             {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q),
             do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1})
@@ -484,7 +482,7 @@ deliver(ClientInfo, Msgs, Session) ->
     do_deliver(ClientInfo, Msgs, [], Session).
 
 do_deliver(_ClientInfo, [], Publishes, Session) ->
-    {ok, lists:reverse(Publishes), reconcile_retry_timer(Session)};
+    {ok, lists:reverse(Publishes), Session};
 do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
     case deliver_msg(ClientInfo, Msg, Session) of
         {ok, [], Session1} ->
@@ -557,12 +555,13 @@ mark_begin_deliver(Msg) ->
 %% Timeouts
 %%--------------------------------------------------------------------
 
--spec handle_timeout(clientinfo(), atom(), session()) ->
-    {ok, replies(), session()}.
-handle_timeout(ClientInfo, retry_delivery = Name, Session) ->
-    retry(ClientInfo, clean_timer(Name, Session));
-handle_timeout(ClientInfo, expire_awaiting_rel = Name, Session) ->
-    expire(ClientInfo, clean_timer(Name, Session)).
+%% @doc Handle timeout events
+-spec handle_timeout(clientinfo(), emqx_session:common_timer_name(), session()) ->
+    {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
+handle_timeout(ClientInfo, retry_delivery, Session) ->
+    retry(ClientInfo, Session);
+handle_timeout(ClientInfo, expire_awaiting_rel, Session) ->
+    expire(ClientInfo, Session).
 
 %%--------------------------------------------------------------------
 %% Retry Delivery
@@ -585,8 +584,8 @@ retry(ClientInfo, Session = #session{inflight = Inflight}) ->
             )
     end.
 
-retry_delivery(_ClientInfo, [], Acc, _Now, Session) ->
-    {ok, lists:reverse(Acc), reconcile_retry_timer(Session)};
+retry_delivery(_ClientInfo, [], Acc, _, Session = #session{retry_interval = Interval}) ->
+    {ok, lists:reverse(Acc), Interval, Session};
 retry_delivery(
     ClientInfo,
     [{PacketId, #inflight_data{timestamp = Ts} = Data} | More],
@@ -599,8 +598,7 @@ retry_delivery(
             {Acc1, Inflight1} = do_retry_delivery(ClientInfo, PacketId, Data, Now, Acc, Inflight),
             retry_delivery(ClientInfo, More, Acc1, Now, Session#session{inflight = Inflight1});
         false ->
-            NSession = ensure_timer(retry_delivery, Interval - max(0, Age), Session),
-            {ok, lists:reverse(Acc), NSession}
+            {ok, lists:reverse(Acc), Interval - max(0, Age), Session}
     end.
 
 do_retry_delivery(
@@ -638,8 +636,7 @@ expire(ClientInfo, Session = #session{awaiting_rel = AwaitingRel}) ->
             {ok, [], Session};
         _ ->
             Now = erlang:system_time(millisecond),
-            NSession = expire_awaiting_rel(ClientInfo, Now, Session),
-            {ok, [], reconcile_expire_timer(NSession)}
+            expire_awaiting_rel(ClientInfo, Now, Session)
     end.
 
 expire_awaiting_rel(
@@ -651,7 +648,11 @@ expire_awaiting_rel(
     AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
     ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
     _ = emqx_session_events:handle_event(ClientInfo, {expired_rel, ExpiredCnt}),
-    Session#session{awaiting_rel = AwaitingRel1}.
+    Session1 = Session#session{awaiting_rel = AwaitingRel1},
+    case maps:size(AwaitingRel1) of
+        0 -> {ok, [], Session1};
+        _ -> {ok, [], Timeout, Session1}
+    end.
 
 %%--------------------------------------------------------------------
 %% Takeover, Resume and Replay
@@ -673,7 +674,7 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
     ),
     ok = emqx_metrics:inc('session.resumed'),
     ok = emqx_hooks:run('session.resumed', [ClientInfo, emqx_session:info(Session)]),
-    Session#session{timers = #{}}.
+    Session.
 
 -spec replay(emqx_types:clientinfo(), [emqx_types:message()], session()) ->
     {ok, replies(), session()}.
@@ -705,7 +706,7 @@ replay(ClientInfo, Session) ->
         emqx_inflight:to_list(Session#session.inflight)
     ),
     {ok, More, Session1} = dequeue(ClientInfo, Session),
-    {ok, append(PubsResend, More), reconcile_expire_timer(Session1)}.
+    {ok, append(PubsResend, More), Session1}.
 
 append(L1, []) -> L1;
 append(L1, L2) -> L1 ++ L2.
@@ -715,7 +716,7 @@ append(L1, L2) -> L1 ++ L2.
 -spec disconnect(session()) -> {idle, session()}.
 disconnect(Session = #session{}) ->
     % TODO: isolate expiry timer / timeout handling here?
-    {idle, cancel_timers(Session)}.
+    {idle, Session}.
 
 -spec terminate(Reason :: term(), session()) -> ok.
 terminate(Reason, Session) ->
@@ -780,40 +781,6 @@ with_ts(Msg) ->
 
 age(Now, Ts) -> Now - Ts.
 
-%%--------------------------------------------------------------------
-
-reconcile_retry_timer(Session = #session{inflight = Inflight}) ->
-    case emqx_inflight:is_empty(Inflight) of
-        false ->
-            ensure_timer(retry_delivery, Session#session.retry_interval, Session);
-        true ->
-            cancel_timer(retry_delivery, Session)
-    end.
-
-reconcile_expire_timer(Session = #session{awaiting_rel = AwaitingRel}) ->
-    case maps:size(AwaitingRel) of
-        0 ->
-            cancel_timer(expire_awaiting_rel, Session);
-        _ ->
-            ensure_timer(expire_awaiting_rel, Session#session.await_rel_timeout, Session)
-    end.
-
-%%--------------------------------------------------------------------
-
-ensure_timer(Name, Timeout, Session = #session{timers = Timers}) ->
-    NTimers = emqx_session:ensure_timer(Name, Timeout, Timers),
-    Session#session{timers = NTimers}.
-
-clean_timer(Name, Session = #session{timers = Timers}) ->
-    Session#session{timers = maps:remove(Name, Timers)}.
-
-cancel_timers(Session = #session{timers = Timers}) ->
-    ok = maps:foreach(fun(_Name, TRef) -> emqx_utils:cancel_timer(TRef) end, Timers),
-    Session#session{timers = #{}}.
-
-cancel_timer(Name, Session = #session{timers = Timers}) ->
-    Session#session{timers = emqx_session:cancel_timer(Name, Timers)}.
-
 %%--------------------------------------------------------------------
 %% For CT tests
 %%--------------------------------------------------------------------

+ 1 - 2
apps/emqx/test/emqx_proper_types.erl

@@ -147,8 +147,7 @@ sessioninfo() ->
             awaiting_rel = awaiting_rel(),
             max_awaiting_rel = non_neg_integer(),
             await_rel_timeout = safty_timeout(),
-            created_at = timestamp(),
-            timers = #{}
+            created_at = timestamp()
         },
         emqx_session:info(Session)
     ).

+ 16 - 48
apps/emqx/test/emqx_session_mem_SUITE.erl

@@ -20,7 +20,6 @@
 -compile(nowarn_export_all).
 
 -include_lib("emqx/include/emqx_mqtt.hrl").
--include_lib("emqx/include/asserts.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
@@ -38,13 +37,6 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 %% CT callbacks
 %%--------------------------------------------------------------------
 
--define(assertTimerSet(NAME, TIMEOUT),
-    ?assertReceive({timer, NAME, TIMEOUT} when is_integer(TIMEOUT))
-).
--define(assertTimerCancel(NAME),
-    ?assertReceive({timer, NAME, cancel})
-).
-
 init_per_suite(Config) ->
     ok = meck:new(
         [emqx_broker, emqx_hooks, emqx_session],
@@ -65,26 +57,6 @@ end_per_suite(Config) ->
     ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
     meck:unload([emqx_broker, emqx_hooks]).
 
-init_per_testcase(_TestCase, Config) ->
-    Pid = self(),
-    ok = meck:expect(
-        emqx_session, ensure_timer, fun(Name, Timeout, Timers) ->
-            _ = Pid ! {timer, Name, Timeout},
-            meck:passthrough([Name, Timeout, Timers])
-        end
-    ),
-    ok = meck:expect(
-        emqx_session, cancel_timer, fun(Name, Timers) ->
-            _ = Pid ! {timer, Name, cancel},
-            meck:passthrough([Name, Timers])
-        end
-    ),
-    Config.
-
-end_per_testcase(_TestCase, Config) ->
-    ok = meck:delete(emqx_session, ensure_timer, 3),
-    Config.
-
 %%--------------------------------------------------------------------
 %% Test cases for session init
 %%--------------------------------------------------------------------
@@ -191,7 +163,6 @@ t_publish_qos2(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
     Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
     {ok, [], Session} = emqx_session_mem:publish(1, Msg, session()),
-    ?assertTimerSet(expire_awaiting_rel, _Timeout),
     ?assertEqual(1, emqx_session_mem:info(awaiting_rel_cnt, Session)),
     {ok, Session1} = emqx_session_mem:pubrel(1, Session),
     ?assertEqual(0, emqx_session_mem:info(awaiting_rel_cnt, Session1)),
@@ -266,7 +237,6 @@ t_puback_with_dequeue(_) ->
     {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
     Session = session(#{inflight => Inflight, mqueue => Q}),
     {ok, Msg1, [{_, Msg3}], Session1} = emqx_session_mem:puback(clientinfo(), 1, Session),
-    ?assertTimerSet(retry_delivery, _Timeout),
     ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session1)),
     ?assertEqual(0, emqx_session_mem:info(mqueue_len, Session1)),
     ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
@@ -335,7 +305,6 @@ t_dequeue(_) ->
     Session1 = emqx_session_mem:enqueue(clientinfo(), Msgs, Session),
     {ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} =
         emqx_session_mem:dequeue(clientinfo(), Session1),
-    ?assertTimerSet(retry_delivery, _Timeout),
     ?assertEqual(0, emqx_session_mem:info(mqueue_len, Session2)),
     ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session2)),
     ?assertEqual(<<"t0">>, emqx_message:topic(Msg0)),
@@ -349,7 +318,6 @@ t_deliver_qos0(_) ->
     Deliveries = enrich([delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]], Session1),
     {ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} =
         emqx_session_mem:deliver(clientinfo(), Deliveries, Session1),
-    ?assertTimerCancel(retry_delivery),
     ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
 
@@ -361,7 +329,6 @@ t_deliver_qos1(_) ->
     Delivers = enrich([delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]], Session),
     {ok, [{1, Msg1}, {2, Msg2}], Session1} =
         emqx_session_mem:deliver(clientinfo(), Delivers, Session),
-    ?assertTimerSet(retry_delivery, _Timeout),
     ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
     ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
@@ -378,7 +345,6 @@ t_deliver_qos2(_) ->
     Delivers = enrich([delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)], Session),
     {ok, [{1, Msg1}, {2, Msg2}], Session1} =
         emqx_session_mem:deliver(clientinfo(), Delivers, Session),
-    ?assertTimerSet(retry_delivery, _Timeout),
     ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session1)),
     ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
@@ -390,7 +356,6 @@ t_deliver_one_msg(_) ->
         enrich(delivery(?QOS_1, <<"t1">>), Session),
         Session
     ),
-    ?assertTimerSet(retry_delivery, _Timeout),
     ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session1)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg)).
 
@@ -399,13 +364,11 @@ t_deliver_when_inflight_is_full(_) ->
     Delivers = enrich([delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session),
     {ok, Publishes, Session1} =
         emqx_session_mem:deliver(clientinfo(), Delivers, Session),
-    {timer, _, Timeout} = ?assertTimerSet(retry_delivery, _Timeout),
     ?assertEqual(1, length(Publishes)),
     ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session1)),
     ?assertEqual(1, emqx_session_mem:info(mqueue_len, Session1)),
     {ok, Msg1, [{2, Msg2}], Session2} =
         emqx_session_mem:puback(clientinfo(), 1, Session1),
-    ?assertTimerSet(retry_delivery, Timeout),
     ?assertEqual(1, emqx_session_mem:info(inflight_cnt, Session2)),
     ?assertEqual(0, emqx_session_mem:info(mqueue_len, Session2)),
     ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
@@ -456,14 +419,16 @@ t_retry(_) ->
     RetryIntervalMs = 100,
     Session = session(#{retry_interval => RetryIntervalMs}),
     Delivers = enrich([delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session),
-    {ok, Pubs, Session1} = emqx_session_mem:deliver(clientinfo(), Delivers, Session),
-    {timer, Name, _} = ?assertTimerSet(_Name, RetryIntervalMs),
+    {ok, Pubs, Session1} = emqx_session_mem:deliver(
+        clientinfo(), Delivers, Session
+    ),
     %% 0.2s
     ElapseMs = 200,
     ok = timer:sleep(ElapseMs),
     Msgs1 = [{I, with_ts(wait_ack, emqx_message:set_flag(dup, Msg))} || {I, Msg} <- Pubs],
-    {ok, Msgs1T, Session2} = emqx_session_mem:handle_timeout(clientinfo(), Name, Session1),
-    ?assertTimerSet(Name, RetryIntervalMs),
+    {ok, Msgs1T, RetryIntervalMs, Session2} = emqx_session_mem:handle_timeout(
+        clientinfo(), retry_delivery, Session1
+    ),
     ?assertEqual(inflight_data_to_msg(Msgs1), remove_deliver_flag(Msgs1T)),
     ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session2)).
 
@@ -504,17 +469,20 @@ t_replay(_) ->
     ?assertEqual(6, emqx_session_mem:info(inflight_cnt, Session3)).
 
 t_expire_awaiting_rel(_) ->
-    {ok, [], Session} = emqx_session_mem:expire(clientinfo(), session()),
-    Timeout = emqx_session_mem:info(await_rel_timeout, Session),
-    Session1 = emqx_session_mem:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session),
-    {ok, [], Session2} = emqx_session_mem:expire(clientinfo(), Session1),
-    ?assertTimerSet(expire_awaiting_rel, Timeout),
-    ?assertEqual(#{1 => Ts}, emqx_session_mem:info(awaiting_rel, Session2)).
+    Now = ts(millisecond),
+    AwaitRelTimeout = 10000,
+    Session = session(#{await_rel_timeout => AwaitRelTimeout}),
+    Ts1 = Now - 1000,
+    Ts2 = Now - 20000,
+    {ok, [], Session1} = emqx_session_mem:expire(clientinfo(), Session),
+    Session2 = emqx_session_mem:set_field(awaiting_rel, #{1 => Ts1, 2 => Ts2}, Session1),
+    {ok, [], Timeout, Session3} = emqx_session_mem:expire(clientinfo(), Session2),
+    ?assertEqual(#{1 => Ts1}, emqx_session_mem:info(awaiting_rel, Session3)),
+    ?assert(Timeout =< AwaitRelTimeout).
 
 t_expire_awaiting_rel_all(_) ->
     Session = session(#{awaiting_rel => #{1 => 1, 2 => 2}}),
     {ok, [], Session1} = emqx_session_mem:expire(clientinfo(), Session),
-    ?assertTimerCancel(expire_awaiting_rel),
     ?assertEqual(#{}, emqx_session_mem:info(awaiting_rel, Session1)).
 
 %%--------------------------------------------------------------------

+ 39 - 11
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl

@@ -1155,7 +1155,11 @@ do_publish(
 ) ->
     case emqx_mqttsn_session:publish(ClientInfo, MsgId, Msg, Session) of
         {ok, _PubRes, NSession} ->
-            handle_out(pubrec, MsgId, Channel#channel{session = NSession});
+            NChannel1 = ensure_timer(
+                expire_awaiting_rel,
+                Channel#channel{session = NSession}
+            ),
+            handle_out(pubrec, MsgId, NChannel1);
         {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
             ok = metrics_inc(Ctx, 'packets.publish.inuse'),
             %% XXX: Use PUBACK to reply a PUBLISH Error Code
@@ -1980,7 +1984,11 @@ handle_deliver(
     of
         {ok, Publishes, NSession} ->
             NChannel = Channel#channel{session = NSession},
-            handle_out(publish, Publishes, NChannel);
+            handle_out(
+                publish,
+                Publishes,
+                ensure_timer(retry_delivery, NChannel)
+            );
         {ok, NSession} ->
             {ok, Channel#channel{session = NSession}}
     end.
@@ -2046,27 +2054,41 @@ handle_timeout(
     end;
 handle_timeout(
     _TRef,
-    {emqx_session, _Name},
+    retry_delivery,
     Channel = #channel{conn_state = disconnected}
 ) ->
     {ok, Channel};
 handle_timeout(
     _TRef,
-    {emqx_session, _Name},
+    retry_delivery,
     Channel = #channel{conn_state = asleep}
+) ->
+    {ok, reset_timer(retry_delivery, Channel)};
+handle_timeout(
+    _TRef,
+    _Name = expire_awaiting_rel,
+    Channel = #channel{conn_state = disconnected}
 ) ->
     {ok, Channel};
 handle_timeout(
     _TRef,
-    {emqx_session, Name},
-    Channel = #channel{session = Session, clientinfo = ClientInfo}
+    Name = expire_awaiting_rel,
+    Channel = #channel{conn_state = asleep}
 ) ->
+    {ok, reset_timer(Name, Channel)};
+handle_timeout(
+    _TRef,
+    Name,
+    Channel = #channel{session = Session, clientinfo = ClientInfo}
+) when Name == retry_delivery; Name == expire_awaiting_rel ->
     case emqx_mqttsn_session:handle_timeout(ClientInfo, Name, Session) of
-        {ok, [], NSession} ->
-            {ok, Channel#channel{session = NSession}};
         {ok, Publishes, NSession} ->
+            NChannel = Channel#channel{session = NSession},
+            handle_out(publish, Publishes, clean_timer(Name, NChannel));
+        {ok, Publishes, Timeout, NSession} ->
+            NChannel = Channel#channel{session = NSession},
             %% XXX: These replay messages should awaiting register acked?
-            handle_out(publish, Publishes, Channel#channel{session = NSession})
+            handle_out(publish, Publishes, reset_timer(Name, Timeout, NChannel))
     end;
 handle_timeout(
     _TRef,
@@ -2195,12 +2217,18 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
 reset_timer(Name, Channel) ->
     ensure_timer(Name, clean_timer(Name, Channel)).
 
+reset_timer(Name, Time, Channel) ->
+    ensure_timer(Name, Time, clean_timer(Name, Channel)).
+
 clean_timer(Name, Channel = #channel{timers = Timers}) ->
     Channel#channel{timers = maps:remove(Name, Timers)}.
 
 interval(keepalive, #channel{keepalive = KeepAlive}) ->
-    emqx_keepalive:info(interval, KeepAlive).
-
+    emqx_keepalive:info(interval, KeepAlive);
+interval(retry_delivery, #channel{session = Session}) ->
+    emqx_mqttsn_session:info(retry_interval, Session);
+interval(expire_awaiting_rel, #channel{session = Session}) ->
+    emqx_mqttsn_session:info(await_rel_timeout, Session).
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl

@@ -131,7 +131,7 @@ with_sess(Fun, Args, Session = #{session := Sess}) ->
         %% for publish / pubrec / pubcomp / deliver
         {ok, ResultReplies, Sess1} ->
             {ok, ResultReplies, Session#{session := Sess1}};
-        %% for puback
+        %% for puback / handle_timeout
         {ok, Msgs, Replies, Sess1} ->
             {ok, Msgs, Replies, Session#{session := Sess1}};
         %% for any errors