Bladeren bron

Merge pull request #12902 from HJianBo/stomp

Pass the Content-type of MQTT message to the Stomp message
JianBo He 1 jaar geleden
bovenliggende
commit
c5c4bb987f

+ 1 - 1
apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_stomp, [
     {description, "Stomp Gateway"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 8 - 1
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -1039,7 +1039,7 @@ handle_deliver(
                         {<<"subscription">>, Id},
                         {<<"message-id">>, next_msgid()},
                         {<<"destination">>, emqx_message:topic(NMessage)},
-                        {<<"content-type">>, <<"text/plain">>}
+                        {<<"content-type">>, content_type_from_mqtt_message(NMessage)}
                     ],
                     Headers1 =
                         case Ack of
@@ -1080,6 +1080,13 @@ handle_deliver(
     ),
     {ok, [{outgoing, lists:reverse(Frames0)}], Channel}.
 
+content_type_from_mqtt_message(Message) ->
+    Properties = emqx_message:get_header(properties, Message, #{}),
+    case maps:get('Content-Type', Properties, undefined) of
+        undefined -> <<"text/plain">>;
+        ContentType -> ContentType
+    end.
+
 %%--------------------------------------------------------------------
 %% Handle timeout
 %%--------------------------------------------------------------------

+ 61 - 0
apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl

@@ -289,6 +289,67 @@ t_subscribe_inuse(_) ->
     with_connection(TopicIdInuseViaHttp),
     with_connection(SubscriptionInuseViaHttp).
 
+t_receive_from_mqtt_publish(_) ->
+    with_connection(fun(Sock) ->
+        ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>),
+        ?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
+
+        ok = send_subscribe_frame(Sock, 0, <<"/queue/foo">>),
+        ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
+
+        %% send mqtt publish with content-type
+        Msg = emqx_message:make(
+            _From = from_testsuite,
+            _QoS = 1,
+            _Topic = <<"/queue/foo">>,
+            _Payload = <<"hello">>,
+            _Flags = #{},
+            _Headers = #{properties => #{'Content-Type' => <<"application/json">>}}
+        ),
+        emqx:publish(Msg),
+
+        {ok, Frame} = recv_a_frame(Sock),
+        ?assertEqual(
+            <<"application/json">>,
+            proplists:get_value(<<"content-type">>, Frame#stomp_frame.headers)
+        ),
+
+        ?assertMatch(
+            #stomp_frame{
+                command = <<"MESSAGE">>,
+                headers = _,
+                body = <<"hello">>
+            },
+            Frame
+        ),
+        lists:foreach(
+            fun({Key, Val}) ->
+                Val = proplists:get_value(Key, Frame#stomp_frame.headers)
+            end,
+            [
+                {<<"destination">>, <<"/queue/foo">>},
+                {<<"subscription">>, <<"0">>}
+            ]
+        ),
+
+        %% assert subscription stats
+        [ClientInfo1] = clients(),
+        ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1),
+
+        %% Unsubscribe
+        ok = send_unsubscribe_frame(Sock, 0),
+        ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
+
+        %% assert subscription stats
+        [ClientInfo2] = clients(),
+        ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2),
+
+        ok = send_message_frame(Sock, <<"/queue/foo">>, <<"You will not receive this msg">>),
+        ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
+
+        {error, timeout} = gen_tcp:recv(Sock, 0, 500)
+    end).
+
 t_transaction(_) ->
     with_connection(fun(Sock) ->
         gen_tcp:send(

+ 1 - 0
changes/ce/fix-12902.md

@@ -0,0 +1 @@
+Pass the Content-type of MQTT message to the Stomp message.