Преглед изворни кода

feat(sn): introduce subs_resume option

As the mqtt-sn v1.2 spec metioned, the gateway will be able to sync the
subscriptions topic-name registry to client when the client resume
it's session

port from: https://github.com/emqx/emqx/pull/7300
JianBo He пре 3 година
родитељ
комит
fbc0240f26

+ 7 - 0
apps/emqx_gateway/src/emqx_gateway_schema.erl

@@ -142,6 +142,13 @@ which do not support any other features except this one.<br>
 There is no connection setup nor tear down, no registration nor subscription.<br>
 There is no connection setup nor tear down, no registration nor subscription.<br>
 The client just sends its 'PUBLISH' messages to a GW"
 The client just sends its 'PUBLISH' messages to a GW"
            })}
            })}
+    , {subs_resume,
+       sc(boolean(),
+          #{ default => false
+           , desc =>
+"Whether to initiate all subscribed topic name registration messages to the
+client after the Session has been taken over by a new channel."
+           })}
     , {predefined,
     , {predefined,
        sc(hoconsc:array(ref(mqttsn_predefined)),
        sc(hoconsc:array(ref(mqttsn_predefined)),
           #{ default => []
           #{ default => []

+ 178 - 125
apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl

@@ -114,7 +114,12 @@
 
 
 -define(NEG_QOS_CLIENT_ID, <<"NegQoS-Client">>).
 -define(NEG_QOS_CLIENT_ID, <<"NegQoS-Client">>).
 
 
--define(REGISTER_TIMEOUT, 10000). % 10s
+-define(REGISTER_INFLIGHT(TopicId, TopicName),
+        #channel{register_inflight = {TopicId, _, TopicName}}).
+
+-define(MAX_RETRY_TIMES, 3).
+
+-define(REGISTER_TIMEOUT, 5000). % 5s
 -define(DEFAULT_SESSION_EXPIRY, 7200000). %% 2h
 -define(DEFAULT_SESSION_EXPIRY, 7200000). %% 2h
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -546,28 +551,19 @@ handle_in(?SN_REGISTER_MSG(_TopicId, MsgId, TopicName),
             {ok, {outgoing, AckPacket}, Channel}
             {ok, {outgoing, AckPacket}, Channel}
     end;
     end;
 
 
-handle_in(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED),
-          Channel = #channel{register_inflight = Inflight}) ->
-    case Inflight of
-        {TopicId, _, TopicName} ->
-            ?SLOG(debug, #{ msg => "register_topic_name_to_client_succesfully"
-                          , topic_id => TopicId
-                          , topic_name => TopicName
-                          }),
-            NChannel = cancel_timer(
-                         register_timer,
-                         Channel#channel{register_inflight = undefined}),
-            send_next_register_or_replay_publish(TopicName, NChannel);
-        _ ->
-            ?SLOG(error, #{ msg => "unexpected_regack_msg"
-                          , msg_id => MsgId
-                          , topic_id => TopicId
-                          , current_inflight => Inflight
-                          }),
-            {ok, Channel}
-    end;
+handle_in(?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED),
+          Channel = ?REGISTER_INFLIGHT(TopicId, TopicName)) ->
+    ?SLOG(debug, #{ msg => "register_topic_name_to_client_succesfully"
+                  , topic_id => TopicId
+                  , topic_name => TopicName
+                  }),
+    NChannel = cancel_timer(
+                 register_timer,
+                 Channel#channel{register_inflight = undefined}),
+    send_next_register_or_replay_publish(TopicName, NChannel);
 
 
-handle_in(?SN_REGACK_MSG(_TopicId, _MsgId, Reason), Channel) ->
+handle_in(?SN_REGACK_MSG(TopicId, _MsgId, Reason),
+          Channel = ?REGISTER_INFLIGHT(TopicId, TopicName)) ->
     case Reason of
     case Reason of
         ?SN_RC_CONGESTION ->
         ?SN_RC_CONGESTION ->
             %% TODO: a or b?
             %% TODO: a or b?
@@ -575,11 +571,28 @@ handle_in(?SN_REGACK_MSG(_TopicId, _MsgId, Reason), Channel) ->
             %% b. re-new the re-transmit timer
             %% b. re-new the re-transmit timer
             {ok, Channel};
             {ok, Channel};
         _ ->
         _ ->
-            %% disconnect this client, if the reason is
+            %% skipp this topic-name register, if the reason is
             %% ?SN_RC_NOT_SUPPORTED, ?SN_RC_INVALID_TOPIC_ID, etc.
             %% ?SN_RC_NOT_SUPPORTED, ?SN_RC_INVALID_TOPIC_ID, etc.
-            handle_out(disconnect, ?SN_RC_NOT_SUPPORTED, Channel)
+            ?SLOG(warning, #{ msg => "skipp_register_topic_name_to_client"
+                            , topic_id => TopicId
+                            , topic_name => TopicName
+                            }),
+            NChannel = cancel_timer(
+                         register_timer,
+                         Channel#channel{register_inflight = undefined}),
+            send_next_register_or_replay_publish(TopicName, NChannel)
     end;
     end;
 
 
+handle_in(?SN_REGACK_MSG(TopicId, MsgId, Reason),
+          Channel = #channel{register_inflight = Inflight}) ->
+    ?SLOG(error, #{ msg => "unexpected_regack_msg"
+                  , acked_msg_id => MsgId
+                  , acked_topic_id => TopicId
+                  , acked_reason => Reason
+                  , current_inflight => Inflight
+                  }),
+    {ok, Channel};
+
 handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) ->
 handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) ->
     TopicId = case is_integer(TopicId0) of
     TopicId = case is_integer(TopicId0) of
                   true -> TopicId0;
                   true -> TopicId0;
@@ -642,7 +655,7 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
                     %% involving the predefined topic name in register to
                     %% involving the predefined topic name in register to
                     %% enhance the gateway's robustness even inconsistent
                     %% enhance the gateway's robustness even inconsistent
                     %% with MQTT-SN channels
                     %% with MQTT-SN channels
-                    handle_out(register, {TopicId, MsgId, TopicName}, Channel)
+                    handle_out(register, {TopicId, TopicName}, Channel)
             end;
             end;
         _ ->
         _ ->
             ?SLOG(error, #{ msg => "cannt_handle_PUBACK"
             ?SLOG(error, #{ msg => "cannt_handle_PUBACK"
@@ -779,12 +792,10 @@ handle_in(?SN_PINGREQ_MSG(ClientId),
     awake(ClientId, Channel);
     awake(ClientId, Channel);
 
 
 handle_in(?SN_PINGREQ_MSG(ClientId),
 handle_in(?SN_PINGREQ_MSG(ClientId),
-          Channel = #channel{conn_state = ConnState}) ->
-    ?SLOG(error, #{ msg => "awake_pingreq_in_bad_conn_state"
-                  , conn_state => ConnState
-                  , clientid => ClientId
-                  }),
-    handle_out(disconnect, protocol_error, Channel);
+          Channel = #channel{
+                       conn_state = connected,
+                       clientinfo = #{clientid := ClientId}}) ->
+    {ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel};
 
 
 handle_in(?SN_DISCONNECT_MSG(_Duration = undefined), Channel) ->
 handle_in(?SN_DISCONNECT_MSG(_Duration = undefined), Channel) ->
     handle_out(disconnect, normal, Channel);
     handle_out(disconnect, normal, Channel);
@@ -833,30 +844,18 @@ after_message_acked(ClientInfo, Msg, #channel{ctx = Ctx}) ->
 outgoing_and_update(Pkt) ->
 outgoing_and_update(Pkt) ->
     [{outgoing, Pkt}, {event, update}].
     [{outgoing, Pkt}, {event, update}].
 
 
-send_next_register_or_replay_publish(TopicName,
-                                     Channel = #channel{
-                                                  session = Session,
-                                                  register_awaiting_queue = []}) ->
-    case emqx_inflight:to_list(emqx_session:info(inflight, Session)) of
-        [] -> {ok, Channel};
-        [{PktId, {inflight_data, _, Msg, _}}] ->
-            case TopicName =:= emqx_message:topic(Msg) of
-                false ->
-                    ?SLOG(warning, #{ msg => "replay_inflight_message_failed"
-                                    , acked_topic_name => TopicName
-                                    , inflight_message => Msg
-                                    }),
-                    {ok, Channel};
-                true ->
-                    NMsg = emqx_message:set_flag(dup, true, Msg),
-                    handle_out(publish, {PktId, NMsg}, Channel)
-            end
-    end;
-send_next_register_or_replay_publish(_TopicName,
-                                     Channel = #channel{
-                                                  register_awaiting_queue = RAQueue}) ->
+send_next_register_or_replay_publish(
+  _TopicName,
+  Channel = #channel{ register_awaiting_queue = []}) ->
+    {Outgoing, NChannel} = resume_or_replay_messages(Channel),
+    {ok, Outgoing, NChannel};
+
+send_next_register_or_replay_publish(
+  _TopicName,
+  Channel = #channel{register_awaiting_queue = RAQueue}) ->
     [RegisterReq | NRAQueue] = RAQueue,
     [RegisterReq | NRAQueue] = RAQueue,
-    handle_out(register, RegisterReq, Channel#channel{register_awaiting_queue = NRAQueue}).
+    handle_out(register, RegisterReq,
+               Channel#channel{register_awaiting_queue = NRAQueue}).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle Publish
 %% Handle Publish
@@ -1263,36 +1262,32 @@ handle_out(pubrel, MsgId, Channel) ->
 handle_out(pubcomp, MsgId, Channel) ->
 handle_out(pubcomp, MsgId, Channel) ->
     {ok, {outgoing, ?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)}, Channel};
     {ok, {outgoing, ?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)}, Channel};
 
 
-handle_out(register, {TopicId, MsgId, TopicName},
-           Channel = #channel{register_inflight = undefined}) ->
+handle_out(register, {TopicId, TopicName},
+           Channel = #channel{session = Session,
+                              register_inflight = undefined}) ->
+    {MsgId, NSession} = emqx_session:obtain_next_pkt_id(Session),
     Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)},
     Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)},
-    NChannel = Channel#channel{register_inflight = {TopicId, MsgId, TopicName}},
-    {ok, Outgoing, ensure_timer(register_timer, ?REGISTER_TIMEOUT, NChannel)};
+    NChannel = Channel#channel{
+                 session = NSession,
+                 register_inflight = {TopicId, MsgId, TopicName}},
+    {ok, Outgoing, ensure_register_timer(NChannel)};
 
 
-handle_out(register, {TopicId, MsgId, TopicName},
+handle_out(register, {TopicId, TopicName},
            Channel = #channel{register_inflight = Inflight,
            Channel = #channel{register_inflight = Inflight,
                               register_awaiting_queue = RAQueue}) ->
                               register_awaiting_queue = RAQueue}) ->
-    case Inflight of
-        {_, _, TopicName} ->
-            ?SLOG(debug, #{ msg => "ingore_handle_out_register"
-                          , requested_register_msg =>
+    case enqueue_register_request({TopicId, TopicName}, Inflight, RAQueue) of
+        ignore ->
+            ?SLOG(debug, #{ msg => "ingore_register_request_to_client"
+                          , register_request =>
                              #{ topic_id => TopicId
                              #{ topic_id => TopicId
-                              , msg_id => MsgId
                               , topic_name => TopicName
                               , topic_name => TopicName
                               }
                               }
                           }),
                           }),
             {ok, Channel};
             {ok, Channel};
-        {InflightTopicId, InflightMsgId, InflightTopicName} ->
-            NRAQueue = RAQueue ++ [{TopicId, MsgId, TopicName}],
+        NRAQueue ->
             ?SLOG(debug, #{ msg => "put_register_msg_into_awaiting_queue"
             ?SLOG(debug, #{ msg => "put_register_msg_into_awaiting_queue"
-                          , inflight_register_msg =>
-                             #{ topic_id => InflightTopicId
-                              , msg_id => InflightMsgId
-                              , topic_name => InflightTopicName
-                              }
-                          , queued_register_msg =>
+                          , register_request =>
                              #{ topic_id => TopicId
                              #{ topic_id => TopicId
-                              , msg_id => MsgId
                               , topic_name => TopicName
                               , topic_name => TopicName
                               }
                               }
                           , register_awaiting_queue_size => length(NRAQueue)
                           , register_awaiting_queue_size => length(NRAQueue)
@@ -1302,7 +1297,20 @@ handle_out(register, {TopicId, MsgId, TopicName},
 
 
 handle_out(disconnect, RC, Channel) ->
 handle_out(disconnect, RC, Channel) ->
     DisPkt = ?SN_DISCONNECT_MSG(undefined),
     DisPkt = ?SN_DISCONNECT_MSG(undefined),
-    {ok, [{outgoing, DisPkt}, {close, RC}], Channel}.
+    Reason = case is_atom(RC) of
+                 true -> RC;
+                 false -> returncode_name(RC)
+             end,
+    {ok, [{outgoing, DisPkt}, {close, Reason}], Channel}.
+
+enqueue_register_request({_, TopicName}, {_, _, TopicName}, _RAQueue) ->
+    ignore;
+enqueue_register_request({TopicId, TopicName}, _, RAQueue) ->
+    HasQueued = lists:any(fun({_, T}) -> T == TopicName end, RAQueue),
+    case HasQueued of
+        true -> ignore;
+        false -> RAQueue ++ [{TopicId, TopicName}]
+    end.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Return ConnAck
 %% Return ConnAck
@@ -1310,35 +1318,53 @@ handle_out(disconnect, RC, Channel) ->
 
 
 return_connack(AckPacket, Channel) ->
 return_connack(AckPacket, Channel) ->
     Replies1 = [{event, connected}, {outgoing, AckPacket}],
     Replies1 = [{event, connected}, {outgoing, AckPacket}],
-    case maybe_resume_session(Channel) of
-        ignore -> {ok, Replies1, Channel};
-        {ok, Publishes, NSession} ->
-            NChannel = Channel#channel{session  = NSession,
-                                       resuming = false,
-                                       pendings = []
-                                      },
-            {Replies2, NChannel1} = outgoing_deliver_and_register(
-                                      do_deliver(Publishes, NChannel)
-                                     ),
-            {ok, Replies1 ++ Replies2, NChannel1}
-    end.
+    {Replies2, NChannel} = maybe_resume_session(Channel),
+    {ok, Replies1 ++ Replies2, NChannel}.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Maybe Resume Session
 %% Maybe Resume Session
 
 
-maybe_resume_session(#channel{resuming = false}) ->
-    ignore;
-maybe_resume_session(#channel{session  = Session,
-                              resuming = true,
-                              pendings = Pendings, clientinfo = ClientInfo}) ->
-    {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
-    case emqx_session:deliver(ClientInfo, Pendings, Session1) of
-        {ok, Session2} ->
-            {ok, Publishes, Session2};
-        {ok, More, Session2} ->
-            {ok, lists:append(Publishes, More), Session2}
+maybe_resume_session(Channel = #channel{resuming = false}) ->
+    {[], Channel};
+maybe_resume_session(Channel = #channel{session  = Session,
+                                        resuming = true}) ->
+    Subs = emqx_session:info(subscriptions, Session),
+    case subs_resume() andalso map_size(Subs) =/= 0 of
+        true ->
+            TopicNames = lists:filter(fun(T) -> not emqx_topic:wildcard(T)
+                                      end, maps:keys(Subs)),
+            Registers = lists:map(fun(T) -> {register, T} end, TopicNames),
+            {Registers, Channel};
+        false ->
+            resume_or_replay_messages(Channel)
     end.
     end.
 
 
+resume_or_replay_messages(Channel = #channel{
+                                       resuming = Resuming,
+                                       pendings = Pendings,
+                                       session = Session,
+                                       clientinfo = ClientInfo}) ->
+    {NPendings, NChannel} =
+        case Resuming of
+            true ->
+                {Pendings, Channel#channel{resuming = false, pendings = []}};
+            false ->
+                {[], Channel}
+        end,
+    {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
+    {NPublishes, NSession} =
+        case emqx_session:deliver(ClientInfo, NPendings, Session1) of
+            {ok, Session2} ->
+                {Publishes, Session2};
+            {ok, More, Session2} ->
+                {lists:append(Publishes, More), Session2}
+        end,
+    outgoing_deliver_and_register(
+      do_deliver(NPublishes, NChannel#channel{session = NSession})).
+
+subs_resume() ->
+    emqx:get_config([gateway, mqttsn, subs_resume]).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Deliver publish: broker -> client
 %% Deliver publish: broker -> client
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -1539,32 +1565,16 @@ handle_info(clean_authz_cache, Channel) ->
 handle_info({subscribe, _}, Channel) ->
 handle_info({subscribe, _}, Channel) ->
    {ok, Channel};
    {ok, Channel};
 
 
-handle_info({register, TopicName},
-            Channel = #channel{
-                         registry = Registry,
-                         session = Session}) ->
-    ClientId = clientid(Channel),
-    case emqx_sn_registry:lookup_topic_id(Registry,  ClientId, TopicName) of
-        undefined ->
-            case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
-                {error, Reason} ->
-                    ?SLOG(error, #{ msg => "register_topic_failed"
-                                  , topic_name => TopicName
-                                  , reason => Reason
-                                  }),
-                    {ok, Channel};
-                TopicId ->
-                    {MsgId, NSession} = emqx_session:obtain_next_pkt_id(Session),
-                    handle_out(
-                      register,
-                      {TopicId, MsgId, TopicName},
-                      Channel#channel{session = NSession})
-            end;
-        Registered ->
-            ?SLOG(debug, #{ msg => "ignore_register_request"
-                          , registered_as => Registered
+handle_info({register, TopicName}, Channel) ->
+    case ensure_registered_topic_name(TopicName, Channel) of
+        {error, Reason} ->
+            ?SLOG(error, #{ msg => "register_topic_failed"
+                          , topic_name => TopicName
+                          , reason => Reason
                           }),
                           }),
-            {ok, Channel}
+            {ok, Channel};
+        {ok, TopicId} ->
+            handle_out(register, {TopicId, TopicName}, Channel)
     end;
     end;
 
 
 handle_info(Info, Channel) ->
 handle_info(Info, Channel) ->
@@ -1581,6 +1591,19 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
         _ -> shutdown(Reason, Channel)
         _ -> shutdown(Reason, Channel)
     end.
     end.
 
 
+ensure_registered_topic_name(TopicName,
+                             Channel = #channel{registry = Registry}) ->
+    ClientId = clientid(Channel),
+    case emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) of
+        undefined ->
+            case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
+                {error, Reason} -> {error, Reason};
+                TopicId -> {ok, TopicId}
+            end;
+        TopicId ->
+            {ok, TopicId}
+    end.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Ensure disconnected
 %% Ensure disconnected
 
 
@@ -1623,12 +1646,17 @@ handle_deliver(Delivers, Channel = #channel{
                 ),
                 ),
     {ok, Channel#channel{session = NSession}};
     {ok, Channel#channel{session = NSession}};
 
 
+%% There are two secensar need to cache delivering messages:
+%%  1. it is being takeover by other channel
+%%  2. it is being resume registered topic-names
 handle_deliver(Delivers, Channel = #channel{
 handle_deliver(Delivers, Channel = #channel{
                                       ctx = Ctx,
                                       ctx = Ctx,
-                                      takeover = true,
+                                      takeover = Takeover,
                                       pendings = Pendings,
                                       pendings = Pendings,
                                       session = Session,
                                       session = Session,
-                                      clientinfo = #{clientid := ClientId}}) ->
+                                      resuming = Resuming,
+                                      clientinfo = #{clientid := ClientId}})
+  when Takeover == true; Resuming == true ->
     NPendings = lists:append(
     NPendings = lists:append(
                   Pendings,
                   Pendings,
                   ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx)
                   ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx)
@@ -1680,7 +1708,6 @@ not_nacked({deliver, _Topic, Msg}) ->
       -> {ok, channel()}
       -> {ok, channel()}
        | {ok, replies(), channel()}
        | {ok, replies(), channel()}
        | {shutdown, Reason :: term(), channel()}.
        | {shutdown, Reason :: term(), channel()}.
-
 handle_timeout(_TRef, {keepalive, _StatVal},
 handle_timeout(_TRef, {keepalive, _StatVal},
                Channel = #channel{keepalive = undefined}) ->
                Channel = #channel{keepalive = undefined}) ->
     {ok, Channel};
     {ok, Channel};
@@ -1731,6 +1758,23 @@ handle_timeout(_TRef, expire_awaiting_rel,
             {ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
             {ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
     end;
     end;
 
 
+handle_timeout(_TRef, {retry_register, RetryTimes},
+               Channel = #channel{register_inflight = {TopicId, MsgId, TopicName}}) ->
+    case RetryTimes < ?MAX_RETRY_TIMES of
+        true ->
+            Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)},
+            {ok, Outgoing, ensure_register_timer(RetryTimes + 1, Channel)};
+        false ->
+            ?SLOG(error, #{ msg => "register_topic_reached_max_retry_times"
+                          , register_request =>
+                             #{ topic_id => TopicId
+                              , msg_id => MsgId
+                              , topic_name => TopicName
+                              }
+                          }),
+            handle_out(disconnect, ?SN_RC2_REACHED_MAX_RETRY, Channel)
+    end;
+
 handle_timeout(_TRef, expire_session, Channel) ->
 handle_timeout(_TRef, expire_session, Channel) ->
     shutdown(expired, Channel);
     shutdown(expired, Channel);
 
 
@@ -1788,6 +1832,14 @@ ensure_asleep_timer(Durtion, Channel) ->
     ensure_timer(asleep_timer, timer:seconds(Durtion),
     ensure_timer(asleep_timer, timer:seconds(Durtion),
                  Channel#channel{asleep_timer_duration = Durtion}).
                  Channel#channel{asleep_timer_duration = Durtion}).
 
 
+ensure_register_timer(Channel) ->
+    ensure_register_timer(0, Channel).
+
+ensure_register_timer(RetryTimes, Channel = #channel{timers = Timers}) ->
+    Msg = maps:get(register_timer, ?TIMER_TABLE),
+    TRef = emqx_misc:start_timer(?REGISTER_TIMEOUT, {Msg, RetryTimes}),
+    Channel#channel{timers = Timers#{register_timer => TRef}}.
+
 cancel_timer(Name, Channel = #channel{timers = Timers}) ->
 cancel_timer(Name, Channel = #channel{timers = Timers}) ->
     case maps:get(Name, Timers, undefined) of
     case maps:get(Name, Timers, undefined) of
         undefined ->
         undefined ->
@@ -1858,4 +1910,5 @@ returncode_name(?SN_RC2_NOT_AUTHORIZE) -> rejected_not_authorize;
 returncode_name(?SN_RC2_FAILED_SESSION) -> rejected_failed_open_session;
 returncode_name(?SN_RC2_FAILED_SESSION) -> rejected_failed_open_session;
 returncode_name(?SN_RC2_KEEPALIVE_TIMEOUT) -> rejected_keepalive_timeout;
 returncode_name(?SN_RC2_KEEPALIVE_TIMEOUT) -> rejected_keepalive_timeout;
 returncode_name(?SN_RC2_EXCEED_LIMITATION) -> rejected_exceed_limitation;
 returncode_name(?SN_RC2_EXCEED_LIMITATION) -> rejected_exceed_limitation;
+returncode_name(?SN_RC2_REACHED_MAX_RETRY) -> reached_max_retry_times;
 returncode_name(_) -> accepted.
 returncode_name(_) -> accepted.

+ 1 - 0
apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl

@@ -58,6 +58,7 @@
 -define(SN_RC2_FAILED_SESSION,    16#81).
 -define(SN_RC2_FAILED_SESSION,    16#81).
 -define(SN_RC2_KEEPALIVE_TIMEOUT, 16#82).
 -define(SN_RC2_KEEPALIVE_TIMEOUT, 16#82).
 -define(SN_RC2_EXCEED_LIMITATION, 16#83).
 -define(SN_RC2_EXCEED_LIMITATION, 16#83).
+-define(SN_RC2_REACHED_MAX_RETRY, 16#84).
 
 
 -define(QOS_NEG1, 3).
 -define(QOS_NEG1, 3).
 
 

+ 338 - 110
apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl

@@ -89,12 +89,22 @@ all() ->
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
     ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
     ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
-    emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
+    emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_gateway]),
     Config.
     Config.
 
 
 end_per_suite(_) ->
 end_per_suite(_) ->
     {ok, _} = emqx:remove_config([gateway, mqttsn]),
     {ok, _} = emqx:remove_config([gateway, mqttsn]),
-    emqx_mgmt_api_test_util:end_suite([emqx_gateway]).
+    emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_conf]).
+
+restart_mqttsn_with_subs_resume_on() ->
+    emqx_gateway_conf:update_gateway(
+      mqttsn,
+      #{<<"subs_resume">> => <<"true">>}).
+
+restart_mqttsn_with_subs_resume_off() ->
+    emqx_gateway_conf:update_gateway(
+      mqttsn,
+      #{<<"subs_resume">> => <<"false">>}).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Test cases
 %% Test cases
@@ -992,8 +1002,8 @@ t_delivery_qos1_register_invalid_topic_id(_) ->
     %% acked with ?SN_RC_INVALID_TOPIC_ID to
     %% acked with ?SN_RC_INVALID_TOPIC_ID to
     send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID),
     send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID),
 
 
-    ?assertEqual(
-       {TopicId, MsgId},
+    ?assertMatch(
+       {TopicId, _},
        check_register_msg_on_udp(<<"ab">>, receive_response(Socket))),
        check_register_msg_on_udp(<<"ab">>, receive_response(Socket))),
     send_regack_msg(Socket, TopicId, MsgId + 1),
     send_regack_msg(Socket, TopicId, MsgId + 1),
 
 
@@ -1008,111 +1018,6 @@ t_delivery_qos1_register_invalid_topic_id(_) ->
     ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
     ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
     gen_udp:close(Socket).
     gen_udp:close(Socket).
 
 
-t_delivery_takeover_and_re_register(_) ->
-    MsgId = 1,
-    {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>, 0),
-    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
-                 receive_response(Socket)),
-
-    send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
-    <<_, ?SN_SUBACK, 2#00100000,
-      TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
-
-    send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
-    <<_, ?SN_SUBACK, 2#01000000,
-      TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
-
-    _ = emqx:publish(
-          emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
-    _ = emqx:publish(
-          emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
-
-    <<_, ?SN_PUBLISH, 2#00100000,
-      TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
-    send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
-
-    <<_, ?SN_PUBLISH, 2#01000000,
-      TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
-    send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
-
-    send_disconnect_msg(Socket, undefined),
-    ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
-    gen_udp:close(Socket),
-
-    %% offline messages will be queued into the MQTT-SN session
-    _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
-    _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
-    _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
-    _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
-    _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
-    _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
-
-    {ok, NSocket} = gen_udp:open(0, [binary]),
-    send_connect_msg(NSocket, <<"test">>, 0),
-    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
-                 receive_response(NSocket)),
-
-    %% qos1
-
-    %% received the resume messages
-    <<_, ?SN_PUBLISH, 2#00100000,
-      TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
-    %% only one qos1/qos2 inflight
-    ?assertEqual(udp_receive_timeout, receive_response(NSocket)),
-    send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
-    %% recv register
-    <<_, ?SN_REGISTER,
-      TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
-    send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
-    %% received the replay messages
-    <<_, ?SN_PUBLISH, 2#00100000,
-      TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
-    send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
-
-    <<_, ?SN_PUBLISH, 2#00100000,
-      TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
-    send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
-
-    <<_, ?SN_PUBLISH, 2#00100000,
-      TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
-    send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
-
-    %% qos2
-    <<_, ?SN_PUBLISH, 2#01000000,
-      TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
-    %% only one qos1/qos2 inflight
-    ?assertEqual(udp_receive_timeout, receive_response(NSocket)),
-    send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
-    %% recv register
-    <<_, ?SN_REGISTER,
-      TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
-    send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
-    %% received the replay messages
-    <<_, ?SN_PUBLISH, 2#01000000,
-      TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
-    send_pubrec_msg(NSocket, MsgIdB1),
-    <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
-    send_pubcomp_msg(NSocket, MsgIdB1),
-
-    <<_, ?SN_PUBLISH, 2#01000000,
-      TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
-    send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
-
-    <<_, ?SN_PUBLISH, 2#01000000,
-      TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
-    send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
-
-    %% no more messages
-    ?assertEqual(udp_receive_timeout, receive_response(NSocket)),
-
-    send_disconnect_msg(NSocket, undefined),
-    ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
-
-    _ = emqx_gateway_cm:kick_session(mqttsn, <<"test">>),
-
-    gen_udp:close(NSocket).
-
 t_will_case01(_) ->
 t_will_case01(_) ->
     QoS = 1,
     QoS = 1,
     Duration = 1,
     Duration = 1,
@@ -1924,6 +1829,326 @@ t_broadcast_test1(_) ->
     timer:sleep(600),
     timer:sleep(600),
     gen_udp:close(Socket).
     gen_udp:close(Socket).
 
 
+t_register_subs_resume_on(_) ->
+    restart_mqttsn_with_subs_resume_on(),
+    MsgId = 1,
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    send_connect_msg(Socket, <<"test">>, 0),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(Socket)),
+
+    send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
+    <<_, ?SN_SUBACK, 2#01000000,
+      TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
+
+    send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
+    <<_, ?SN_SUBACK, 2#01000000,
+      TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
+
+    _ = emqx:publish(
+          emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
+    _ = emqx:publish(
+          emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"test-b">>)),
+
+    <<_, ?SN_PUBLISH, 2#00100000,
+      TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
+    send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
+
+    <<_, ?SN_PUBLISH, 2#00100000,
+      TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
+    send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
+
+    send_disconnect_msg(Socket, undefined),
+    ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
+    gen_udp:close(Socket),
+
+    %% offline messages will be queued into the MQTT-SN session
+    _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-a">>, <<"m3">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-b">>, <<"m1">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
+
+    emqx_logger:set_log_level(debug),
+
+    {ok, NSocket} = gen_udp:open(0, [binary]),
+    send_connect_msg(NSocket, <<"test">>, 0),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(NSocket)),
+
+    %% receive subs register requests
+    <<_, ?SN_REGISTER,
+      TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
+    send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
+
+    <<_, ?SN_REGISTER,
+      TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
+    send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
+
+    %% receive the queued messages
+
+    <<_, ?SN_PUBLISH, 2#00000000,
+      TopicIdA:16, 0:16, "m1">> = receive_response(NSocket),
+
+    <<_, ?SN_PUBLISH, 2#00100000,
+      TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket),
+    send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
+
+    <<_, ?SN_PUBLISH, 2#01000000,
+      TopicIdA:16, MsgIdA2:16, "m3">> = receive_response(NSocket),
+    send_pubrec_msg(NSocket, MsgIdA2),
+    <<_, ?SN_PUBREL, MsgIdA2:16>> = receive_response(NSocket),
+    send_pubcomp_msg(NSocket, MsgIdA2),
+
+
+    <<_, ?SN_PUBLISH, 2#00000000,
+      TopicIdB:16, 0:16, "m1">> = receive_response(NSocket),
+
+    <<_, ?SN_PUBLISH, 2#00100000,
+      TopicIdB:16, MsgIdB1:16, "m2">> = receive_response(NSocket),
+    send_puback_msg(NSocket, TopicIdB, MsgIdB1, ?SN_RC_ACCEPTED),
+
+    <<_, ?SN_PUBLISH, 2#01000000,
+      TopicIdB:16, MsgIdB2:16, "m3">> = receive_response(NSocket),
+    send_pubrec_msg(NSocket, MsgIdB2),
+    <<_, ?SN_PUBREL, MsgIdB2:16>> = receive_response(NSocket),
+    send_pubcomp_msg(NSocket, MsgIdB2),
+
+    %% no more messages
+    ?assertEqual(udp_receive_timeout, receive_response(NSocket)),
+
+    gen_udp:close(NSocket),
+    {ok, NSocket1} = gen_udp:open(0, [binary]),
+    send_connect_msg(NSocket1, <<"test">>),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(NSocket1)),
+    send_disconnect_msg(NSocket1, undefined),
+    ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
+    gen_udp:close(NSocket1),
+    restart_mqttsn_with_subs_resume_off().
+
+t_register_subs_resume_off(_) ->
+    MsgId = 1,
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    send_connect_msg(Socket, <<"test">>, 0),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(Socket)),
+
+    send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
+    <<_, ?SN_SUBACK, 2#00100000,
+      TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
+
+    send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
+    <<_, ?SN_SUBACK, 2#01000000,
+      TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
+
+    _ = emqx:publish(
+          emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
+    _ = emqx:publish(
+          emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
+
+    <<_, ?SN_PUBLISH, 2#00100000,
+      TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
+    send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
+
+    <<_, ?SN_PUBLISH, 2#01000000,
+      TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
+    send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
+
+    send_disconnect_msg(Socket, undefined),
+    ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
+    gen_udp:close(Socket),
+
+    %% offline messages will be queued into the MQTT-SN session
+    _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
+
+    {ok, NSocket} = gen_udp:open(0, [binary]),
+    send_connect_msg(NSocket, <<"test">>, 0),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(NSocket)),
+
+    %% qos1
+
+    %% received the resume messages
+    <<_, ?SN_PUBLISH, 2#00100000,
+      TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
+    %% only one qos1/qos2 inflight
+    ?assertEqual(udp_receive_timeout, receive_response(NSocket)),
+    send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
+    %% recv register
+    <<_, ?SN_REGISTER,
+      TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
+    send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
+    %% received the replay messages
+    <<_, ?SN_PUBLISH, 2#00100000,
+      TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
+    send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
+
+    <<_, ?SN_PUBLISH, 2#00100000,
+      TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
+    send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
+
+    <<_, ?SN_PUBLISH, 2#00100000,
+      TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
+    send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
+
+    %% qos2
+    <<_, ?SN_PUBLISH, 2#01000000,
+      TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
+    %% only one qos1/qos2 inflight
+    ?assertEqual(udp_receive_timeout, receive_response(NSocket)),
+    send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
+    %% recv register
+    <<_, ?SN_REGISTER,
+      TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
+    send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
+    %% received the replay messages
+    <<_, ?SN_PUBLISH, 2#01000000,
+      TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
+    send_pubrec_msg(NSocket, MsgIdB1),
+    <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
+    send_pubcomp_msg(NSocket, MsgIdB1),
+
+    <<_, ?SN_PUBLISH, 2#01000000,
+      TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
+    send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
+
+    <<_, ?SN_PUBLISH, 2#01000000,
+      TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
+    send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
+
+    %% no more messages
+    ?assertEqual(udp_receive_timeout, receive_response(NSocket)),
+
+    gen_udp:close(NSocket),
+    {ok, NSocket1} = gen_udp:open(0, [binary]),
+    send_connect_msg(NSocket1, <<"test">>),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(NSocket1)),
+    send_disconnect_msg(NSocket1, undefined),
+    ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
+    gen_udp:close(NSocket1).
+
+t_register_skip_failure_topic_name_and_reach_max_retry_times(_) ->
+    restart_mqttsn_with_subs_resume_on(),
+    MsgId = 1,
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    send_connect_msg(Socket, <<"test">>, 0),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(Socket)),
+
+    send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
+    <<_, ?SN_SUBACK, 2#01000000,
+      TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
+
+    send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
+    <<_, ?SN_SUBACK, 2#01000000,
+      TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
+
+    send_disconnect_msg(Socket, undefined),
+    ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
+    gen_udp:close(Socket),
+
+    emqx_logger:set_log_level(debug),
+
+    {ok, NSocket} = gen_udp:open(0, [binary]),
+    send_connect_msg(NSocket, <<"test">>, 0),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(NSocket)),
+
+    %% receive subs register requests
+
+    %% registered failured topic-name will be skipped
+    <<_, ?SN_REGISTER,
+      TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
+    send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID),
+
+    %% the gateway try to shutdown this client if it reached max-retry-times
+    %%
+    %% times-0
+    <<_, ?SN_REGISTER,
+      TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
+    %% times-1
+    timer:sleep(5000), %% RETYRY_TIMEOUT
+    <<_, ?SN_REGISTER,
+      TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
+    %% times-2
+    timer:sleep(5000), %% RETYRY_TIMEOUT
+    <<_, ?SN_REGISTER,
+      TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
+    %% just a ping
+    send_pingreq_msg(NSocket, <<"test">>),
+    ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(NSocket)),
+    %% times-3
+    timer:sleep(5000), %% RETYRY_TIMEOUT
+    <<_, ?SN_REGISTER,
+      TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
+    %% shutdown due to reached max retry times
+    timer:sleep(5000), %% RETYRY_TIMEOUT
+    ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
+    gen_udp:close(NSocket),
+    restart_mqttsn_with_subs_resume_off().
+
+t_register_enqueue_delivering_messages(_) ->
+    restart_mqttsn_with_subs_resume_on(),
+    MsgId = 1,
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    send_connect_msg(Socket, <<"test">>, 0),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(Socket)),
+
+    send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
+    <<_, ?SN_SUBACK, 2#01000000,
+      TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
+
+    send_disconnect_msg(Socket, undefined),
+    ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
+    gen_udp:close(Socket),
+
+    {ok, NSocket} = gen_udp:open(0, [binary]),
+    send_connect_msg(NSocket, <<"test">>, 0),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(NSocket)),
+
+    %% receive subs register requests
+
+    %% registered failured topic-name will be skipped
+    <<_, ?SN_REGISTER,
+      TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
+
+    _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)),
+    _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
+
+    send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED),
+
+    %% receive the queued messages
+
+    <<_, ?SN_PUBLISH, 2#00000000,
+      TopicIdA:16, 0:16, "m1">> = receive_response(NSocket),
+
+    <<_, ?SN_PUBLISH, 2#00100000,
+      TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket),
+    send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
+
+    %% no more messages
+    ?assertEqual(udp_receive_timeout, receive_response(NSocket)),
+
+    gen_udp:close(NSocket),
+    {ok, NSocket1} = gen_udp:open(0, [binary]),
+    send_connect_msg(NSocket1, <<"test">>),
+    ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
+                 receive_response(NSocket1)),
+    send_disconnect_msg(NSocket1, undefined),
+    ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
+    gen_udp:close(NSocket1),
+    restart_mqttsn_with_subs_resume_off().
+
 t_socket_passvice(_) ->
 t_socket_passvice(_) ->
     %% TODO: test this gateway enter the passvie event
     %% TODO: test this gateway enter the passvie event
     ok.
     ok.
@@ -2082,9 +2307,12 @@ send_register_msg(Socket, TopicName, MsgId) ->
     ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket).
     ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket).
 
 
 send_regack_msg(Socket, TopicId, MsgId) ->
 send_regack_msg(Socket, TopicId, MsgId) ->
+    send_regack_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED).
+
+send_regack_msg(Socket, TopicId, MsgId, Rc) ->
     Length = 7,
     Length = 7,
     MsgType = ?SN_REGACK,
     MsgType = ?SN_REGACK,
-    Packet = <<Length:8, MsgType:8, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
+    Packet = <<Length:8, MsgType:8, TopicId:16, MsgId:16, Rc>>,
     ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
     ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
 
 
 send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) ->
 send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) ->