Преглед изворни кода

fix(stomp): ensure the subscripton_cnt timely updated

JianBo He пре 2 година
родитељ
комит
bad0c35bb9

+ 13 - 3
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -499,7 +499,7 @@ handle_in(
                 [{MountedTopic, SubOpts} | _] ->
                     NSubs = [{SubId, MountedTopic, Ack, SubOpts} | Subs],
                     NChannel1 = NChannel#channel{subscriptions = NSubs},
-                    handle_out(receipt, receipt_id(Headers), NChannel1)
+                    handle_out_and_update(receipt, receipt_id(Headers), NChannel1)
             end;
         {error, ErrMsg, NChannel} ->
             ?SLOG(error, #{
@@ -541,7 +541,7 @@ handle_in(
             false ->
                 {ok, Channel}
         end,
-    handle_out(receipt, receipt_id(Headers), NChannel);
+    handle_out_and_update(receipt, receipt_id(Headers), NChannel);
 %% XXX: How to ack a frame ???
 handle_in(Frame = ?PACKET(?CMD_ACK, Headers), Channel) ->
     case header(<<"transaction">>, Headers) of
@@ -769,6 +769,12 @@ handle_out(receipt, ReceiptId, Channel) ->
     Frame = receipt_frame(ReceiptId),
     {ok, {outgoing, Frame}, Channel}.
 
+handle_out_and_update(receipt, undefined, Channel) ->
+    {ok, [{event, updated}], Channel};
+handle_out_and_update(receipt, ReceiptId, Channel) ->
+    Frame = receipt_frame(ReceiptId),
+    {ok, [{outgoing, Frame}, {event, updated}], Channel}.
+
 %%--------------------------------------------------------------------
 %% Handle call
 %%--------------------------------------------------------------------
@@ -812,7 +818,7 @@ handle_call(
                     ),
                     NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts} | Subs],
                     NChannel1 = NChannel#channel{subscriptions = NSubs},
-                    reply({ok, {MountedTopic, NSubOpts}}, NChannel1);
+                    reply({ok, {MountedTopic, NSubOpts}}, [{event, updated}], NChannel1);
                 {error, ErrMsg, NChannel} ->
                     ?SLOG(error, #{
                         msg => "failed_to_subscribe_topic",
@@ -841,6 +847,7 @@ handle_call(
     ),
     reply(
         ok,
+        [{event, updated}],
         Channel#channel{
             subscriptions = lists:keydelete(MountedTopic, 2, Subs)
         }
@@ -1107,6 +1114,9 @@ terminate(Reason, #channel{
 reply(Reply, Channel) ->
     {reply, Reply, Channel}.
 
+reply(Reply, Msgs, Channel) ->
+    {reply, Reply, Msgs, Channel}.
+
 shutdown(Reason, Channel) ->
     {shutdown, Reason, Channel}.
 

+ 16 - 0
apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl

@@ -256,6 +256,10 @@ t_subscribe(_) ->
             ]
         ),
 
+        %% assert subscription stats
+        [ClientInfo1] = clients(),
+        ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1),
+
         %% Unsubscribe
         gen_tcp:send(
             Sock,
@@ -278,6 +282,10 @@ t_subscribe(_) ->
             },
             _, _} = parse(Data2),
 
+        %% assert subscription stats
+        [ClientInfo2] = clients(),
+        ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2),
+
         gen_tcp:send(
             Sock,
             serialize(
@@ -802,10 +810,14 @@ t_rest_clienit_info(_) ->
 
         {200, Subs1} = request(get, ClientPath ++ "/subscriptions"),
         ?assertEqual(2, length(Subs1)),
+        {200, StompClient2} = request(get, ClientPath),
+        ?assertMatch(#{subscriptions_cnt := 2}, StompClient2),
 
         {204, _} = request(delete, ClientPath ++ "/subscriptions/t%2Fa"),
         {200, Subs2} = request(get, ClientPath ++ "/subscriptions"),
         ?assertEqual(1, length(Subs2)),
+        {200, StompClient3} = request(get, ClientPath),
+        ?assertMatch(#{subscriptions_cnt := 1}, StompClient3),
 
         %% kickout
         {204, _} = request(delete, ClientPath),
@@ -855,3 +867,7 @@ get_field(command, #stomp_frame{command = Command}) ->
     Command;
 get_field(body, #stomp_frame{body = Body}) ->
     Body.
+
+clients() ->
+    {200, Clients} = request(get, "/gateways/stomp/clients"),
+    maps:get(data, Clients).