瀏覽代碼

Merge pull request #14160 from keynslug/fix/EMQX-13229/special-topic-match-583

fix(ds-lts): respect special topic matching rules in LTS-based layouts
Andrew Mayorov 1 年之前
父節點
當前提交
4cd821972a

+ 10 - 2
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl

@@ -122,9 +122,17 @@ t_basic_crud(_Config) ->
     ),
 
     {ok, 201, #{<<"id">> := QueueID2}} = Resp2,
+    Resp3 = api_get(["durable_queues"]),
     ?assertMatch(
-        {ok, #{<<"data">> := [#{<<"id">> := QueueID1}, #{<<"id">> := QueueID2}]}},
-        api_get(["durable_queues"])
+        {ok, #{<<"data">> := [#{<<"id">> := _}, #{<<"id">> := _}]}},
+        Resp3
+    ),
+    ?assertMatch(
+        [#{<<"id">> := QueueID1}, #{<<"id">> := QueueID2}],
+        begin
+            {ok, #{<<"data">> := Queues}} = Resp3,
+            lists:sort(emqx_utils_maps:key_comparer(<<"id">>), Queues)
+        end
     ),
 
     ?assertMatch(

+ 17 - 2
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -78,13 +78,14 @@
 %% Fixed size binary or integer, depending on the options:
 -type static_key() :: non_neg_integer() | binary().
 
-%% Trie root:
+%% Trie roots:
 -define(PREFIX, prefix).
+-define(PREFIX_SPECIAL, special).
 %% Special prefix root for reverse lookups:
 -define(rlookup, rlookup).
 -define(rlookup(STATIC), {?rlookup, STATIC}).
 
--type state() :: static_key() | ?PREFIX.
+-type state() :: static_key() | ?PREFIX | ?PREFIX_SPECIAL.
 
 -type varying() :: [level() | ?PLUS].
 
@@ -231,6 +232,13 @@ trie_copy_learned_paths(OldTrie, NewTrie) ->
 
 %% @doc Lookup the topic key. Create a new one, if not found.
 -spec topic_key(trie(), threshold_fun(), [level()]) -> msg_storage_key().
+topic_key(Trie, ThresholdFun, [<<"$", _/bytes>> | _] = Tokens) ->
+    %% [MQTT-4.7.2-1]
+    %% Put any topic starting with `$` into a separate _special_ root.
+    %% Using a special root only when the topic and the filter start with $<X>
+    %% prevents special topics from matching with + or # pattern, but not with
+    %% $<X>/+ or $<X>/# pattern. See also `match_topics/2`.
+    do_topic_key(Trie, ThresholdFun, 0, ?PREFIX_SPECIAL, Tokens, [], []);
 topic_key(Trie, ThresholdFun, Tokens) ->
     do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, [], []).
 
@@ -242,6 +250,13 @@ lookup_topic_key(Trie, Tokens) ->
 %% @doc Return list of keys of topics that match a given topic filter
 -spec match_topics(trie(), [level() | '+' | '#']) ->
     [msg_storage_key()].
+match_topics(Trie, [<<"$", _/bytes>> | _] = TopicFilter) ->
+    %% [MQTT-4.7.2-1]
+    %% Any topics starting with `$` should belong to a separate _special_ root.
+    %% Using a special root only when the topic and the filter start with $<X>
+    %% prevents special topics from matching with + or # pattern, but not with
+    %% $<X>/+ or $<X>/# pattern.
+    do_match_topics(Trie, ?PREFIX_SPECIAL, [], TopicFilter);
 match_topics(Trie, TopicFilter) ->
     do_match_topics(Trie, ?PREFIX, [], TopicFilter).
 

+ 47 - 2
apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl

@@ -31,6 +31,8 @@
 
 -define(SHARD, shard(?FUNCTION_NAME)).
 
+-define(LTS_THRESHOLD, {simple, {20, 10}}).
+
 -define(DB_CONFIG(CONFIG), #{
     backend => builtin_local,
     storage => ?config(layout, CONFIG),
@@ -47,9 +49,14 @@ init_per_group(Group, Config) ->
     LayoutConf =
         case Group of
             reference ->
-                {emqx_ds_storage_reference, #{}};
+                {emqx_ds_storage_reference, #{
+                    lts_threshold_spec => ?LTS_THRESHOLD
+                }};
             skipstream_lts ->
-                {emqx_ds_storage_skipstream_lts, #{with_guid => true}};
+                {emqx_ds_storage_skipstream_lts, #{
+                    with_guid => true,
+                    lts_threshold_spec => ?LTS_THRESHOLD
+                }};
             bitfield_lts ->
                 {emqx_ds_storage_bitfield_lts, #{}}
         end,
@@ -292,6 +299,44 @@ t_replay(Config) ->
     ?assert(check(?SHARD, <<"#">>, 0, Messages)),
     ok.
 
+t_replay_special_topics(_Config) ->
+    %% Verify that topic matching rules respect [MQTT-4.7.2-1]:
+    %% The Server MUST NOT match Topic Filters starting with a wildcard character (# or +)
+    %% with Topic Names beginning with a $ character.
+    {Values1, Values2} = lists:split(5, lists:seq(0, 1000, 100)),
+    STopic1 = <<"$SPECIAL/test/1/2">>,
+    ELTopic = <<"/test/">>,
+    Topics1 = [<<"g/test/1">>, <<"g/test/2">>, <<"/test/">>],
+    SBatch1 = [make_message(V, STopic1, bin(V)) || V <- Values1],
+    Batch1 = [make_message(V, Topic, bin(V)) || Topic <- Topics1, V <- Values1],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, SBatch1 ++ Batch1),
+    %% Expect special topic messages to show up only in `$SPECIAL/test/#` subscription:
+    ?assert(check(?SHARD, <<"$SPECIAL/test/#">>, 0, SBatch1)),
+    %% ...But not in an otherwise fitting wildcard subscriptions:
+    ?assert(check(?SHARD, <<"+/test/#">>, 0, Batch1)),
+    check(?SHARD, <<"+/test/+/+">>, 0, []),
+    %% ...And not in different special roots:
+    check(?SHARD, <<"$SYS/test/#">>, 0, []),
+    %% Publish through a lot of similarly structured topic to let LTS "learn":
+    STopic2 = <<"$SPECIAL/test/3/4">>,
+    Topics2 = [emqx_utils:format("~p/test/~p", [I, I]) || I <- lists:seq(1, 40)],
+    Batch2 = [make_message(V, Topic, bin(V)) || Topic <- Topics2 ++ [ELTopic], V <- Values2],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch2),
+    SBatch2 = [make_message(V, STopic2, bin(V)) || V <- Values2],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, SBatch2),
+    %% ...Then verify the same things:
+    ?assert(check(?SHARD, <<"$SPECIAL/test/#">>, 0, SBatch1 ++ SBatch2)),
+    ?assert(check(?SHARD, <<"$SPECIAL/test/+/4">>, 0, SBatch2)),
+    ?assert(check(?SHARD, <<"+/test/#">>, 0, Batch1 ++ Batch2)),
+    check(?SHARD, <<"+/test/+/+">>, 0, SBatch2),
+    %% Also verify that having a lot of different $-roots does not break things:
+    STopics = [emqx_utils:format("$T~p/test/~p", [I, I]) || I <- lists:seq(1, 40)],
+    SBatch3 = [make_message(V, T, bin(V)) || T <- STopics, V <- Values2],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, SBatch3),
+    ?assert(check(?SHARD, <<"$T1/test/#">>, 0, SBatch3)),
+    ?assert(check(?SHARD, <<"+/test/#">>, 0, Batch1 ++ Batch2)),
+    check(?SHARD, <<"$SYS/test/#">>, 0, []).
+
 %% This testcase verifies poll functionality that doesn't involve events:
 t_poll(Config) ->
     ?check_trace(

+ 1 - 0
changes/ce/fix-14160.en.md

@@ -0,0 +1 @@
+For durable session subscriptions, respect topic matching rules for durable topics starting with `$` symbol according to the MQTT specification.