Explorar el Código

Merge pull request #13742 from JimMoen/fix-pub-doller-prefixed-msg-to-sharp-sub

fix: topic matching when deliver retainer msg
JimMoen hace 1 año
padre
commit
775aa4a2c2

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer.app.src

@@ -2,7 +2,7 @@
 {application, emqx_retainer, [
     {description, "EMQX Retainer"},
     % strict semver, bump manually!
-    {vsn, "5.0.26"},
+    {vsn, "5.0.27"},
     {modules, []},
     {registered, [emqx_retainer_sup]},
     {applications, [kernel, stdlib, emqx, emqx_ctl]},

+ 65 - 18
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -308,12 +308,13 @@ deliver(Messages, Pid, Topic, Limiter) ->
             no_receiver;
         _ ->
             BatchSize = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
+            NMessages = filter_delivery(Messages, Topic),
             case BatchSize of
                 0 ->
-                    deliver_to_client(Messages, Pid, Topic),
+                    deliver_to_client(NMessages, Pid, Topic),
                     {ok, Limiter};
                 _ ->
-                    deliver_in_batches(Messages, BatchSize, Pid, Topic, Limiter)
+                    deliver_in_batches(NMessages, BatchSize, Pid, Topic, Limiter)
             end
     end.
 
@@ -334,25 +335,71 @@ deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) ->
             Drop
     end.
 
-deliver_to_client([Msg | T], Pid, Topic) ->
-    _ =
-        case emqx_banned:check_clientid(Msg#message.from) of
-            false ->
-                Pid ! {deliver, Topic, Msg};
-            true ->
-                ?tp(
-                    notice,
-                    ignore_retained_message_deliver,
-                    #{
-                        reason => "client is banned",
-                        clientid => Msg#message.from
-                    }
-                )
-        end,
-    deliver_to_client(T, Pid, Topic);
+deliver_to_client([Msg | Rest], Pid, Topic) ->
+    Pid ! {deliver, Topic, Msg},
+    deliver_to_client(Rest, Pid, Topic);
 deliver_to_client([], _, _) ->
     ok.
 
+-define(DELIVER_ALLOWED, true).
+-define(DELIVER_NOT_ALLOWED, false).
+-define(publisher_client_banned, publisher_client_banned).
+-define(msg_topic_not_match, msg_topic_not_match).
+
+filter_delivery(Messages, Topic) ->
+    FilterFun =
+        fun(Msg) ->
+            Pipe = emqx_utils:pipeline(
+                [
+                    fun check_clientid_banned/2,
+                    fun 'check_prefixed_$_with_wildcard'/2
+                ],
+                {Msg, Topic},
+                ?DELIVER_NOT_ALLOWED
+            ),
+            _ =
+                case Pipe of
+                    {ok, _, ?DELIVER_ALLOWED} ->
+                        true;
+                    {error, _, _} ->
+                        false
+                end
+        end,
+    lists:filter(FilterFun, Messages).
+
+check_clientid_banned({Msg, _Topic} = Input, _) ->
+    case emqx_banned:check_clientid(Msg#message.from) of
+        false ->
+            {ok, Input, ?DELIVER_ALLOWED};
+        true ->
+            ?tp(
+                debug,
+                ignore_retained_message_due_to_banned,
+                #{
+                    reason => ?publisher_client_banned,
+                    clientid => Msg#message.from
+                }
+            ),
+            {error, ?publisher_client_banned, ?DELIVER_NOT_ALLOWED}
+    end.
+
+%% [MQTT-4.7.2-1]
+'check_prefixed_$_with_wildcard'({Msg, Topic} = Input, _) ->
+    case emqx_topic:match(Msg#message.topic, Topic) of
+        false ->
+            ?tp(
+                ignore_retained_message_due_to_topic_not_match,
+                #{
+                    reason => ?msg_topic_not_match,
+                    msg_topic => Msg#message.topic,
+                    subscribed_topic => Topic
+                }
+            ),
+            {error, ?msg_topic_not_match, ?DELIVER_NOT_ALLOWED};
+        true ->
+            {ok, Input, ?DELIVER_ALLOWED}
+    end.
+
 take(N, List) ->
     take(N, List, 0, []).
 

+ 167 - 26
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -45,23 +45,25 @@ groups() ->
 common_tests() ->
     emqx_common_test_helpers:all(?MODULE) -- [t_reindex].
 
--define(BASE_CONF, <<
-    "retainer {\n"
-    "    enable = true\n"
-    "    msg_clear_interval = 0s\n"
-    "    msg_expiry_interval = 0s\n"
-    "    max_payload_size = 1MB\n"
-    "    flow_control {\n"
-    "        batch_read_number = 0\n"
-    "        batch_deliver_number = 0\n"
-    "     }\n"
-    "   backend {\n"
-    "        type = built_in_database\n"
-    "        storage_type = ram\n"
-    "        max_retained_messages = 0\n"
-    "     }\n"
-    "}"
->>).
+%% erlfmt-ignore
+-define(BASE_CONF, <<"
+retainer {
+  enable = true
+  msg_clear_interval = 0s
+  msg_expiry_interval = 0s
+  max_payload_size = 1MB
+  delivery_rate = \"1000/s\"
+  flow_control {
+    batch_read_number = 0
+    batch_deliver_number = 0
+  }
+  backend {
+    type = built_in_database
+    storage_type = ram
+    max_retained_messages = 0
+  }
+}
+">>).
 
 %%--------------------------------------------------------------------
 %% Setups
@@ -99,9 +101,9 @@ init_per_testcase(_TestCase, Config) ->
     Config.
 
 end_per_testcase(t_flow_control, _Config) ->
-    restore_delivery();
+    reset_delivery_rate_to_default();
 end_per_testcase(t_cursor_cleanup, _Config) ->
-    restore_delivery();
+    reset_delivery_rate_to_default();
 end_per_testcase(_TestCase, _Config) ->
     ok.
 
@@ -248,6 +250,141 @@ t_wildcard_subscription(Config) ->
     emqtt:publish(C1, <<"/x/y/z">>, <<"">>, [{qos, 0}, {retain, true}]),
     ok = emqtt:disconnect(C1).
 
+'t_wildcard_no_$_prefix'(Config) ->
+    {ok, C0} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C0),
+    emqtt:publish(
+        C0,
+        <<"$test/t/0">>,
+        <<"this is a retained message with $ prefix in topic">>,
+        [{qos, 0}, {retain, true}]
+    ),
+    emqtt:publish(
+        C0,
+        <<"$test/test/1">>,
+        <<"this is another retained message with $ prefix in topic">>,
+        [{qos, 0}, {retain, true}]
+    ),
+
+    emqtt:publish(
+        C0,
+        <<"t/1">>,
+        <<"this is a retained message 1">>,
+        [{qos, 0}, {retain, true}]
+    ),
+    emqtt:publish(
+        C0,
+        <<"t/2">>,
+        <<"this is a retained message 2">>,
+        [{qos, 0}, {retain, true}]
+    ),
+    publish(
+        C0,
+        <<"/t/3">>,
+        <<"this is a retained message 3">>,
+        [{qos, 0}, {retain, true}],
+        Config
+    ),
+
+    snabbkaffe:start_trace(),
+    SnabbkaffeSubFun = fun(NEvents) ->
+        snabbkaffe:subscribe(
+            ?match_event(#{?snk_kind := ignore_retained_message_due_to_topic_not_match}),
+            NEvents,
+            _Timeout = 10000,
+            0
+        )
+    end,
+    SnabbkaffeReceiveAndAssert = fun(SubRef, NEvents) ->
+        {ok, Trace} = snabbkaffe:receive_events(SubRef),
+        ?assertEqual(
+            NEvents, length(?of_kind(ignore_retained_message_due_to_topic_not_match, Trace))
+        )
+    end,
+
+    %%%%%%%%%%
+    %% C1 subscribes to `#'
+    {ok, SubRef1} = SnabbkaffeSubFun(2),
+    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C1),
+    {ok, #{}, [0]} = emqtt:subscribe(C1, <<"#">>, 0),
+    %% Matched 5 msgs but only receive 3 msgs, 2 ignored
+    %% (`$test/t/0` and `$test/test/1` with `$` prefix in topic are ignored)
+    SnabbkaffeReceiveAndAssert(SubRef1, 2),
+    Msgs1 = receive_messages(3),
+    ?assertMatch(
+        %% The order in which messages are received is not always the same as the order in which they are published.
+        %% The received order follows the order in which the indexes match.
+        %% i.e.
+        %%   The first level of the topic `/t/3` is empty.
+        %%   So it will be the first message that be matched and be sent.
+        [
+            #{topic := <<"/t/3">>},
+            #{topic := <<"t/1">>},
+            #{topic := <<"t/2">>}
+        ],
+        Msgs1,
+        #{msgs => Msgs1}
+    ),
+    ok = emqtt:disconnect(C1),
+
+    %%%%%%%%%%
+    %% C2 subscribes to `$test/#'
+    {ok, C2} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C2),
+    {ok, #{}, [0]} = emqtt:subscribe(C2, <<"$test/#">>, 0),
+    %% Matched 2 msgs and receive them all, no ignored
+    Msgs2 = receive_messages(2),
+    ?assertMatch(
+        [
+            #{topic := <<"$test/t/0">>},
+            #{topic := <<"$test/test/1">>}
+        ],
+        Msgs2,
+        #{msgs => Msgs2}
+    ),
+    ok = emqtt:disconnect(C2),
+
+    %%%%%%%%%%
+    %% C3 subscribes to `+/+'
+    {ok, C3} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C3),
+    {ok, #{}, [0]} = emqtt:subscribe(C3, <<"+/+">>, 0),
+    %% Matched 2 msgs and receive them all, no ignored
+    Msgs3 = receive_messages(2),
+    ?assertMatch(
+        [
+            #{topic := <<"t/1">>},
+            #{topic := <<"t/2">>}
+        ],
+        Msgs3,
+        #{msgs => Msgs3}
+    ),
+    ok = emqtt:disconnect(C3),
+
+    %%%%%%%%%%
+    %% C4 subscribes to `+/t/#'
+    {ok, SubRef4} = SnabbkaffeSubFun(1),
+    {ok, C4} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C4),
+    {ok, #{}, [0]} = emqtt:subscribe(C4, <<"+/t/#">>, 0),
+    %% Matched 2 msgs but only receive 1 msgs, 1 ignored
+    %% (`$test/t/0` with `$` prefix in topic are ignored)
+    SnabbkaffeReceiveAndAssert(SubRef4, 1),
+    Msgs4 = receive_messages(1),
+    ?assertMatch(
+        [
+            #{topic := <<"/t/3">>}
+        ],
+        Msgs4,
+        #{msgs => Msgs4}
+    ),
+    ok = emqtt:disconnect(C4),
+
+    snabbkaffe:stop(),
+
+    ok.
+
 t_message_expiry(Config) ->
     ConfMod = fun(Conf) ->
         Conf#{<<"delivery_rate">> := <<"infinity">>}
@@ -689,7 +826,7 @@ t_deliver_when_banned(_) ->
     snabbkaffe:start_trace(),
     {ok, SubRef} =
         snabbkaffe:subscribe(
-            ?match_event(#{?snk_kind := ignore_retained_message_deliver}),
+            ?match_event(#{?snk_kind := ignore_retained_message_due_to_banned}),
             _NEvents = 3,
             _Timeout = 10000,
             0
@@ -698,7 +835,7 @@ t_deliver_when_banned(_) ->
     {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]),
 
     {ok, Trace} = snabbkaffe:receive_events(SubRef),
-    ?assertEqual(3, length(?of_kind(ignore_retained_message_deliver, Trace))),
+    ?assertEqual(3, length(?of_kind(ignore_retained_message_due_to_banned, Trace))),
     snabbkaffe:stop(),
     emqx_banned:delete(Who),
     {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
@@ -802,7 +939,10 @@ test_retain_while_reindexing(C, Deadline) ->
     end.
 
 receive_messages(Count) ->
-    receive_messages(Count, []).
+    lists:reverse(
+        receive_messages(Count, [])
+    ).
+
 receive_messages(0, Msgs) ->
     Msgs;
 receive_messages(Count, Msgs) ->
@@ -811,7 +951,7 @@ receive_messages(Count, Msgs) ->
             ct:log("Msg: ~p ~n", [Msg]),
             receive_messages(Count - 1, [Msg | Msgs]);
         Other ->
-            ct:log("Other Msg: ~p~n", [Other]),
+            ct:print("Other Msg: ~p~n", [Other]),
             receive_messages(Count, Msgs)
     after 2000 ->
         Msgs
@@ -914,13 +1054,14 @@ setup_slow_delivery() ->
             }
     }).
 
-restore_delivery() ->
+reset_delivery_rate_to_default() ->
     emqx_limiter_server:del_bucket(emqx_retainer, internal),
     emqx_retainer:update_config(#{
+        <<"delivery_rate">> => <<"1000/s">>,
         <<"flow_control">> =>
             #{
-                <<"batch_read_number">> => 1,
-                <<"batch_deliver_number">> => 1
+                <<"batch_read_number">> => 0,
+                <<"batch_deliver_number">> => 0
             }
     }).
 

+ 1 - 0
changes/ce/fix-13742.en.md

@@ -0,0 +1 @@
+Fixed when subscribing with `+` as the first level, or `#` as a wildcard, retained messages with topics starting with `$` are incorrectly received.