Переглянути джерело

Merge pull request #11195 from HJianBo/stomp-topic-id-unique

Fix(stomp): avoid to create duplicated subscription by HTTP or Client
JianBo He 2 роки тому
батько
коміт
b11fa5f3bf

+ 1 - 1
apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src

@@ -1,6 +1,6 @@
 {application, emqx_gateway_stomp, [
     {description, "Stomp Gateway"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 22 - 8
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -508,9 +508,13 @@ handle_in(
                     handle_out_and_update(receipt, receipt_id(Headers), NChannel1)
             end;
         {error, subscription_id_inused, NChannel} ->
-            ErrMsg = io_lib:format("Subscription id ~w is in used", [SubId]),
+            ErrMsg = io_lib:format("Subscription id ~s is in used", [SubId]),
             ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
             shutdown(subscription_id_inused, ErrorFrame, NChannel);
+        {error, topic_already_subscribed, NChannel} ->
+            ErrMsg = io_lib:format("Topic ~s already in subscribed", [Topic]),
+            ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
+            shutdown(topic_already_subscribed, ErrorFrame, NChannel);
         {error, acl_denied, NChannel} ->
             ErrMsg = io_lib:format("Insufficient permissions for ~s", [Topic]),
             ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
@@ -695,12 +699,15 @@ check_subscribed_status(
 ) ->
     MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic),
     case lists:keyfind(SubId, 1, Subs) of
-        {SubId, MountedTopic, _Ack, _} ->
-            ok;
-        {SubId, _OtherTopic, _Ack, _} ->
+        {SubId, _MountedTopic, _Ack, _} ->
             {error, subscription_id_inused};
         false ->
-            ok
+            case lists:keyfind(MountedTopic, 2, Subs) of
+                {_OtherSubId, MountedTopic, _Ack, _} ->
+                    {error, topic_already_subscribed};
+                false ->
+                    ok
+            end
     end.
 
 check_sub_acl(
@@ -826,13 +833,20 @@ handle_call(
                     NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts} | Subs],
                     NChannel1 = NChannel#channel{subscriptions = NSubs},
                     reply({ok, {MountedTopic, NSubOpts}}, [{event, updated}], NChannel1);
-                {error, ErrMsg, NChannel} ->
+                {error, ErrCode, NChannel} ->
                     ?SLOG(error, #{
                         msg => "failed_to_subscribe_topic",
                         topic => Topic,
-                        reason => ErrMsg
+                        reason => ErrCode
                     }),
-                    reply({error, ErrMsg}, NChannel)
+                    ErrMsg =
+                        case ErrCode of
+                            subscription_id_inused ->
+                                io_lib:format("Subscription id ~s is in used", [SubId]);
+                            topic_already_subscribed ->
+                                io_lib:format("Topic ~s already in subscribed", [Topic])
+                        end,
+                    reply({error, lists:flatten(ErrMsg)}, NChannel)
             end
     end;
 handle_call(

+ 106 - 76
apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl

@@ -162,62 +162,28 @@ t_heartbeat(_) ->
 
 t_subscribe(_) ->
     with_connection(fun(Sock) ->
-        gen_tcp:send(
-            Sock,
-            serialize(
-                <<"CONNECT">>,
-                [
-                    {<<"accept-version">>, ?STOMP_VER},
-                    {<<"host">>, <<"127.0.0.1:61613">>},
-                    {<<"login">>, <<"guest">>},
-                    {<<"passcode">>, <<"guest">>},
-                    {<<"heart-beat">>, <<"0,0">>}
-                ]
-            )
-        ),
-        {ok, Data} = gen_tcp:recv(Sock, 0),
-        {ok,
-            #stomp_frame{
-                command = <<"CONNECTED">>,
-                headers = _,
-                body = _
-            },
-            _, _} = parse(Data),
+        ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>),
+        ?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
 
-        %% Subscribe
-        gen_tcp:send(
-            Sock,
-            serialize(
-                <<"SUBSCRIBE">>,
-                [
-                    {<<"id">>, 0},
-                    {<<"destination">>, <<"/queue/foo">>},
-                    {<<"ack">>, <<"auto">>}
-                ]
-            )
-        ),
+        ok = send_subscribe_frame(Sock, 0, <<"/queue/foo">>),
+        ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
 
         %% 'user-defined' header will be retain
-        gen_tcp:send(
-            Sock,
-            serialize(
-                <<"SEND">>,
-                [
-                    {<<"destination">>, <<"/queue/foo">>},
-                    {<<"user-defined">>, <<"emq">>}
-                ],
-                <<"hello">>
-            )
-        ),
+        ok = send_message_frame(Sock, <<"/queue/foo">>, <<"hello">>, [
+            {<<"user-defined">>, <<"emq">>}
+        ]),
+        ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
 
-        {ok, Data1} = gen_tcp:recv(Sock, 0, 1000),
-        {ok,
-            Frame = #stomp_frame{
+        {ok, Frame} = recv_a_frame(Sock),
+
+        ?assertMatch(
+            #stomp_frame{
                 command = <<"MESSAGE">>,
                 headers = _,
                 body = <<"hello">>
             },
-            _, _} = parse(Data1),
+            Frame
+        ),
         lists:foreach(
             fun({Key, Val}) ->
                 Val = proplists:get_value(Key, Frame#stomp_frame.headers)
@@ -234,43 +200,80 @@ t_subscribe(_) ->
         ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1),
 
         %% Unsubscribe
-        gen_tcp:send(
-            Sock,
-            serialize(
-                <<"UNSUBSCRIBE">>,
-                [
-                    {<<"id">>, 0},
-                    {<<"receipt">>, <<"12345">>}
-                ]
-            )
-        ),
-
-        {ok, Data2} = gen_tcp:recv(Sock, 0, 1000),
-
-        {ok,
-            #stomp_frame{
-                command = <<"RECEIPT">>,
-                headers = [{<<"receipt-id">>, <<"12345">>}],
-                body = _
-            },
-            _, _} = parse(Data2),
+        ok = send_unsubscribe_frame(Sock, 0),
+        ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
 
         %% assert subscription stats
         [ClientInfo2] = clients(),
         ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2),
 
-        gen_tcp:send(
-            Sock,
-            serialize(
-                <<"SEND">>,
-                [{<<"destination">>, <<"/queue/foo">>}],
-                <<"You will not receive this msg">>
-            )
-        ),
+        ok = send_message_frame(Sock, <<"/queue/foo">>, <<"You will not receive this msg">>),
+        ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
 
         {error, timeout} = gen_tcp:recv(Sock, 0, 500)
     end).
 
+t_subscribe_inuse(_) ->
+    UsedTopic = <<"/queue/foo">>,
+    UsedSubId = <<"0">>,
+    Setup =
+        fun(Sock) ->
+            ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>),
+            ?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
+            ok = send_subscribe_frame(Sock, UsedSubId, UsedTopic),
+            ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock))
+        end,
+    TopicIdInuse =
+        fun(Sock) ->
+            Setup(Sock),
+            %% topic-id is in use
+            ok = send_subscribe_frame(Sock, UsedSubId, <<"/queue/bar">>),
+
+            {ok, ErrorFrame} = recv_a_frame(Sock),
+            ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
+            ?assertEqual(<<"Subscription id 0 is in used">>, ErrorFrame#stomp_frame.body),
+            ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0))
+        end,
+
+    SubscriptionInuse =
+        fun(Sock) ->
+            Setup(Sock),
+            %% topic is in use
+            ok = send_subscribe_frame(Sock, 1, UsedTopic),
+
+            {ok, ErrorFrame} = recv_a_frame(Sock),
+            ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
+            ?assertEqual(<<"Topic /queue/foo already in subscribed">>, ErrorFrame#stomp_frame.body),
+            ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0))
+        end,
+
+    TopicIdInuseViaHttp =
+        fun(Sock) ->
+            Setup(Sock),
+            %% assert subscription stats
+            [#{clientid := ClientId}] = clients(),
+            {error, ErrMsg} = create_subscription(ClientId, <<"/queue/bar">>, UsedSubId),
+            ?assertEqual(<<"Subscription id 0 is in used">>, ErrMsg),
+
+            ok = send_disconnect_frame(Sock)
+        end,
+
+    SubscriptionInuseViaHttp =
+        fun(Sock) ->
+            Setup(Sock),
+            %% assert subscription stats
+            [#{clientid := ClientId}] = clients(),
+            {error, ErrMsg} = create_subscription(ClientId, UsedTopic, <<"1">>),
+            ?assertEqual(<<"Topic /queue/foo already in subscribed">>, ErrMsg),
+
+            ok = send_disconnect_frame(Sock)
+        end,
+
+    with_connection(TopicIdInuse),
+    with_connection(SubscriptionInuse),
+    with_connection(TopicIdInuseViaHttp),
+    with_connection(SubscriptionInuseViaHttp).
+
 t_transaction(_) ->
     with_connection(fun(Sock) ->
         gen_tcp:send(
@@ -1072,6 +1075,7 @@ recv_a_frame(Sock) ->
         {ok, Frame, Rest, NParser} ->
             put(parser, NParser),
             put(rest, Rest),
+            ct:pal("recv_a_frame: ~p~n", [Frame]),
             {ok, Frame};
         {error, _} = Err ->
             erase(parser),
@@ -1124,11 +1128,23 @@ send_subscribe_frame(Sock, Id, Topic) ->
         ],
     ok = gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, Headers)).
 
+send_unsubscribe_frame(Sock, Id) when is_integer(Id) ->
+    Headers =
+        [
+            {<<"id">>, Id},
+            {<<"receipt">>, <<"rp-", (integer_to_binary(Id))/binary>>}
+        ],
+    gen_tcp:send(Sock, serialize(<<"UNSUBSCRIBE">>, Headers)).
+
 send_message_frame(Sock, Topic, Payload) ->
+    send_message_frame(Sock, Topic, Payload, []).
+
+send_message_frame(Sock, Topic, Payload, Headers0) ->
     Headers =
         [
             {<<"destination">>, Topic},
             {<<"receipt">>, <<"rp-", Topic/binary>>}
+            | Headers0
         ],
     ok = gen_tcp:send(Sock, serialize(<<"SEND">>, Headers, Payload)).
 
@@ -1142,3 +1158,17 @@ send_disconnect_frame(Sock, ReceiptId) ->
 clients() ->
     {200, Clients} = request(get, "/gateways/stomp/clients"),
     maps:get(data, Clients).
+
+create_subscription(ClientId, Topic, SubId) ->
+    Path = io_lib:format("/gateways/stomp/clients/~s/subscriptions", [ClientId]),
+    Body = #{
+        topic => Topic,
+        qos => 1,
+        sub_props => #{subid => SubId}
+    },
+    case request(post, Path, Body) of
+        {201, _} ->
+            ok;
+        {400, #{message := Message}} ->
+            {error, Message}
+    end.

+ 1 - 0
changes/ce/fix-11195.en.md

@@ -0,0 +1 @@
+Avoid to create duplicated subscription by HTTP API or client in Stomp gateway