فهرست منبع

feat(mqttsn): support to register unknown topic-name to the client

JianBo He 4 سال پیش
والد
کامیت
7ade24b344

+ 79 - 24
apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl

@@ -338,9 +338,16 @@ process_connect(Channel = #channel{
            SessFun
           ) of
         {ok, #{session := Session,
-               present := _Present}} ->
+               present := false}} ->
             handle_out(connack, ?SN_RC_ACCEPTED,
                        Channel#channel{session = Session});
+        {ok, #{session := Session, present := true, pendings := Pendings}} ->
+            Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
+            NChannel = Channel#channel{session  = Session,
+                                       resuming = true,
+                                       pendings = Pendings1
+                                      },
+            handle_out(connack, ?SN_RC_ACCEPTED, NChannel);
         {error, Reason} ->
             ?SLOG(error, #{ msg => "failed_to_open_session"
                           , reason => Reason
@@ -1101,10 +1108,11 @@ awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
         {ok, More, Session2} ->
             {lists:append(Publishes, More), Session2}
     end,
-    {Packets, NChannel} = do_deliver(NPublishes,
-                                     Channel#channel{session = NSession}),
-    Outgoing = [{outgoing, Packets} || length(Packets) > 0],
-    {ok, Outgoing, NChannel}.
+    {Replies, NChannel} = outgoing_deliver_and_register(
+                            do_deliver(NPublishes,
+                                       Channel#channel{session = NSession})
+                           ),
+    {ok, Replies, NChannel}.
 
 asleep(Duration, Channel = #channel{conn_state = asleep}) ->
     %% 6.14: The client can also modify its sleep duration
@@ -1146,8 +1154,10 @@ handle_out(connack, ReasonCode,
     shutdown(Reason, AckPacket, Channel);
 
 handle_out(publish, Publishes, Channel) ->
-    {Packets, NChannel} = do_deliver(Publishes, Channel),
-    {ok, {outgoing, Packets}, NChannel};
+    {Replies, NChannel} = outgoing_deliver_and_register(
+                            do_deliver(Publishes, Channel)
+                           ),
+    {ok, Replies, NChannel};
 
 handle_out(puback, {TopicId, MsgId, Rc}, Channel) ->
     {ok, {outgoing, ?SN_PUBACK_MSG(TopicId, MsgId, Rc)}, Channel};
@@ -1207,17 +1217,18 @@ handle_out(disconnect, RC, Channel) ->
 %%--------------------------------------------------------------------
 
 return_connack(AckPacket, Channel) ->
-    Replies = [{event, connected}, {outgoing, AckPacket}],
+    Replies1 = [{event, connected}, {outgoing, AckPacket}],
     case maybe_resume_session(Channel) of
-        ignore -> {ok, Replies, Channel};
+        ignore -> {ok, Replies1, Channel};
         {ok, Publishes, NSession} ->
             NChannel = Channel#channel{session  = NSession,
                                        resuming = false,
                                        pendings = []
                                       },
-            {Packets, NChannel1} = do_deliver(Publishes, NChannel),
-            Outgoing = [{outgoing, Packets} || length(Packets) > 0],
-            {ok, Replies ++ Outgoing, NChannel1}
+            {Replies2, NChannel1} = outgoing_deliver_and_register(
+                                      do_deliver(Publishes, NChannel)
+                                     ),
+            {ok, Replies1 ++ Replies2, NChannel1}
     end.
 
 %%--------------------------------------------------------------------
@@ -1240,7 +1251,6 @@ maybe_resume_session(#channel{session  = Session,
 %% Deliver publish: broker -> client
 %%--------------------------------------------------------------------
 
-%% return list(emqx_types:packet())
 do_deliver({pubrel, MsgId}, Channel) ->
     {[?SN_PUBREC_MSG(?SN_PUBREL, MsgId)], Channel};
 
@@ -1271,6 +1281,18 @@ do_deliver(Publishes, Channel) when is_list(Publishes) ->
         end, {[], Channel}, Publishes),
     {lists:reverse(Packets), NChannel}.
 
+outgoing_deliver_and_register({Packets, Channel}) ->
+    {NPackets, NRegisters} =
+        lists:foldl(fun(P, {Acc0, Acc1}) ->
+            case P of
+                {register, _} ->
+                    {Acc0, [P|Acc1]};
+                _ ->
+                    {[P|Acc0], Acc1}
+            end
+         end, {[], []}, Packets),
+    {[{outgoing, lists:reverse(NPackets)}] ++ lists:reverse(NRegisters), Channel}.
+
 message_to_packet(MsgId, Message,
                   #channel{registry = Registry,
                            clientinfo = #{clientid := ClientId}}) ->
@@ -1281,17 +1303,19 @@ message_to_packet(MsgId, Message,
                    ?QOS_0 -> 0;
                    _ -> MsgId
                end,
-    {TopicIdType, NTopicId} =
-        case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of
-            {predef, PredefTopicId} ->
-                {?SN_PREDEFINED_TOPIC, PredefTopicId};
-            TopicId when is_integer(TopicId) ->
-                {?SN_NORMAL_TOPIC, TopicId};
-            undefined ->
-                {?SN_SHORT_TOPIC, Topic}
-        end,
-    Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType},
-    ?SN_PUBLISH_MSG(Flags, NTopicId, NMsgId, Payload).
+    case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of
+        {predef, PredefTopicId} ->
+            Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_PREDEFINED_TOPIC},
+            ?SN_PUBLISH_MSG(Flags, PredefTopicId, NMsgId, Payload);
+        TopicId when is_integer(TopicId) ->
+            Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_NORMAL_TOPIC},
+            ?SN_PUBLISH_MSG(Flags, TopicId, NMsgId, Payload);
+        undefined when byte_size(Topic) =:= 2 ->
+            Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_SHORT_TOPIC},
+            ?SN_PUBLISH_MSG(Flags, Topic, NMsgId, Payload);
+        undefined ->
+            {register, Topic}
+    end.
 
 %%--------------------------------------------------------------------
 %% Handle call
@@ -1423,6 +1447,34 @@ handle_info(clean_authz_cache, Channel) ->
 handle_info({subscribe, _}, Channel) ->
    {ok, Channel};
 
+handle_info({register, TopicName},
+            Channel = #channel{
+                         registry = Registry,
+                         session = Session}) ->
+    ClientId = clientid(Channel),
+    case emqx_sn_registry:lookup_topic_id(Registry,  ClientId, TopicName) of
+        undefined ->
+            case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
+                {error, Reason} ->
+                    ?SLOG(error, #{ msg => "register_topic_failed"
+                                  , topic_name => TopicName
+                                  , reason => Reason
+                                  }),
+                    {ok, Channel};
+                TopicId ->
+                    {MsgId, NSession} = emqx_session:obtain_next_pkt_id(Session),
+                    handle_out(
+                      register,
+                      {TopicId, MsgId, TopicName},
+                      Channel#channel{session = NSession})
+            end;
+        Registered ->
+            ?SLOG(debug, #{ msg => "ignore_register_request"
+                          , registered_as => Registered
+                          }),
+            {ok, Channel}
+    end;
+
 handle_info(Info, Channel) ->
     ?SLOG(error, #{ msg => "unexpected_info"
                   , info => Info
@@ -1678,6 +1730,9 @@ interval(await_timer, #channel{session = Session}) ->
 %% Helper functions
 %%--------------------------------------------------------------------
 
+clientid(#channel{clientinfo = #{clientid := ClientId}}) ->
+    ClientId.
+
 run_hooks(Ctx, Name, Args) ->
     emqx_gateway_ctx:metrics_inc(Ctx, Name),
     emqx_hooks:run(Name, Args).

+ 1 - 1
apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl

@@ -349,7 +349,7 @@ format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) ->
                   [QoS, MsgId, TopicId, ReturnCode]);
 format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
     #mqtt_sn_flags{topic_id_type = TopicIdType} = Flags,
-    io_lib:format("SN_UNSUBSCRIBE(TopicIdType=~s, MsgId=~w, TopicId=~w)",
+    io_lib:format("SN_UNSUBSCRIBE(TopicIdType=~w, MsgId=~w, TopicId=~w)",
                   [TopicIdType, Msgid, Topic]);
 format(?SN_UNSUBACK_MSG(MsgId)) ->
     io_lib:format("SN_UNSUBACK(MsgId=~w)", [MsgId]);

+ 0 - 5
apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl

@@ -1048,11 +1048,6 @@ t_delivery_takeover_and_re_register(_) ->
     _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
     _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
 
-    emqx_logger:set_log_level(debug),
-    dbg:tracer(),dbg:p(all,call),
-    dbg:tp(emqx_gateway_cm,x),
-    %dbg:tpl(emqx_gateway_cm, request_stepdown,x),
-
     {ok, NSocket} = gen_udp:open(0, [binary]),
     send_connect_msg(NSocket, <<"test">>, 0),
     ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,