Przeglądaj źródła

Merge pull request #10951 from HJianBo/mqttsn-mountpoint-pub

fix(mqttsn): make mountpoint works for publish
JianBo He 2 lat temu
rodzic
commit
21b451a4c2

+ 1 - 1
apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src

@@ -1,6 +1,6 @@
 {application, emqx_gateway_mqttsn, [
     {description, "MQTT-SN Gateway"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 3 - 2
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl

@@ -1111,15 +1111,16 @@ check_pub_authz(
 
 convert_pub_to_msg(
     {TopicName, Flags, Data},
-    Channel = #channel{clientinfo = #{clientid := ClientId}}
+    Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}}
 ) ->
     #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
     NewQoS = get_corrected_qos(QoS),
+    NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
     Message = put_message_headers(
         emqx_message:make(
             ClientId,
             NewQoS,
-            TopicName,
+            NTopicName,
             Data,
             #{dup => Dup, retain => Retain},
             #{}

+ 45 - 0
apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl

@@ -120,6 +120,13 @@ restart_mqttsn_with_subs_resume_off() ->
         Conf#{<<"subs_resume">> => <<"false">>}
     ).
 
+restart_mqttsn_with_mountpoint(Mp) ->
+    Conf = emqx:get_raw_config([gateway, mqttsn]),
+    emqx_gateway_conf:update_gateway(
+        mqttsn,
+        Conf#{<<"mountpoint">> => Mp}
+    ).
+
 default_config() ->
     ?CONF_DEFAULT.
 
@@ -990,6 +997,44 @@ t_publish_qos2_case03(_) ->
     ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
     gen_udp:close(Socket).
 
+t_publish_mountpoint(_) ->
+    restart_mqttsn_with_mountpoint(<<"mp/">>),
+    Dup = 0,
+    QoS = 1,
+    Retain = 0,
+    Will = 0,
+    CleanSession = 0,
+    MsgId = 1,
+    TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
+    Topic = <<"abc">>,
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
+    ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
+    send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
+    ?assertEqual(
+        <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
+            TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
+        receive_response(Socket)
+    ),
+
+    Payload1 = <<20, 21, 22, 23>>,
+    send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1),
+    ?assertEqual(
+        <<7, ?SN_PUBACK, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)
+    ),
+    timer:sleep(100),
+
+    ?assertEqual(
+        <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
+            TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>,
+        receive_response(Socket)
+    ),
+
+    send_disconnect_msg(Socket, undefined),
+    restart_mqttsn_with_mountpoint(<<>>),
+    gen_udp:close(Socket).
+
 t_delivery_qos1_register_invalid_topic_id(_) ->
     Dup = 0,
     QoS = 1,

+ 1 - 0
changes/ce/fix-10951.en.md

@@ -0,0 +1 @@
+Fix the issue in MQTT-SN gateway where the `mountpoint` does not take effect on message publishing.