瀏覽代碼

Merge pull request #10563 from qzhuyan/fix/william/no_local_filter_all

fix(mqtt): drop all local messages in session deliver
William Yang 2 年之前
父節點
當前提交
328add3a02
共有 3 個文件被更改,包括 41 次插入3 次删除
  1. 3 3
      apps/emqx/src/emqx_session.erl
  2. 36 0
      apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl
  3. 2 0
      changes/ce/fix-10563.en.md

+ 3 - 3
apps/emqx/src/emqx_session.erl

@@ -291,16 +291,16 @@ stats(Session) -> info(?STATS_KEYS, Session).
 
 ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
     Subs = info(subscriptions, Session),
-    lists:dropwhile(
+    lists:filter(
         fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
             case maps:find(Topic, Subs) of
                 {ok, #{nl := 1}} when Subscriber =:= Publisher ->
                     ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
                     ok = emqx_metrics:inc('delivery.dropped'),
                     ok = emqx_metrics:inc('delivery.dropped.no_local'),
-                    true;
+                    false;
                 _ ->
-                    false
+                    true
             end
         end,
         Delivers

+ 36 - 0
apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl

@@ -829,6 +829,42 @@ t_subscribe_no_local(Config) ->
     ?assertEqual(1, length(receive_messages(2))),
     ok = emqtt:disconnect(Client1).
 
+t_subscribe_no_local_mixed(Config) ->
+    ConnFun = ?config(conn_fun, Config),
+    Topic = nth(1, ?TOPICS),
+    {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
+    {ok, _} = emqtt:ConnFun(Client1),
+
+    {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
+    {ok, _} = emqtt:ConnFun(Client2),
+
+    %% Given tow clients and  client1 subscribe to topic with 'no local' set to true
+    {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]),
+
+    %% When mixed publish traffic are sent from both clients (Client1 sent 6 and Client2 sent 2)
+    CB = {fun emqtt:sync_publish_result/3, [self(), async_res]},
+    ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed1">>, 0, CB),
+    ok = emqtt:publish_async(Client2, Topic, <<"t_subscribe_no_local_mixed2">>, 0, CB),
+    ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed3">>, 0, CB),
+    ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed4">>, 0, CB),
+    ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed5">>, 0, CB),
+    ok = emqtt:publish_async(Client2, Topic, <<"t_subscribe_no_local_mixed6">>, 0, CB),
+    ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed7">>, 0, CB),
+    ok = emqtt:publish_async(Client1, Topic, <<"t_subscribe_no_local_mixed8">>, 0, CB),
+    [
+        receive
+            {async_res, Res} -> ?assertEqual(ok, Res)
+        end
+     || _ <- lists:seq(1, 8)
+    ],
+
+    %% Then only two messages from clients 2 are received
+    PubRecvd = receive_messages(9),
+    ct:pal("~p", [PubRecvd]),
+    ?assertEqual(2, length(PubRecvd)),
+    ok = emqtt:disconnect(Client1),
+    ok = emqtt:disconnect(Client2).
+
 t_subscribe_actions(Config) ->
     ConnFun = ?config(conn_fun, Config),
     Topic = nth(1, ?TOPICS),

+ 2 - 0
changes/ce/fix-10563.en.md

@@ -0,0 +1,2 @@
+Corrected an issue where the no_local flag was not functioning correctly.
+