Просмотр исходного кода

fix(ds-lts): put special $ topics into a separate root instead

Andrew Mayorov 1 год назад
Родитель
Сommit
2b2513ef9b

+ 21 - 20
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -78,13 +78,14 @@
 %% Fixed size binary or integer, depending on the options:
 -type static_key() :: non_neg_integer() | binary().
 
-%% Trie root:
+%% Trie roots:
 -define(PREFIX, prefix).
+-define(PREFIX_SPECIAL, special).
 %% Special prefix root for reverse lookups:
 -define(rlookup, rlookup).
 -define(rlookup(STATIC), {?rlookup, STATIC}).
 
--type state() :: static_key() | ?PREFIX.
+-type state() :: static_key() | ?PREFIX | ?PREFIX_SPECIAL.
 
 -type varying() :: [level() | ?PLUS].
 
@@ -231,6 +232,11 @@ trie_copy_learned_paths(OldTrie, NewTrie) ->
 
 %% @doc Lookup the topic key. Create a new one, if not found.
 -spec topic_key(trie(), threshold_fun(), [level()]) -> msg_storage_key().
+topic_key(Trie, ThresholdFun, [<<"$", _/bytes>> | _] = Tokens) ->
+    %% [MQTT-4.7.2-1]
+    %% Put any topic starting with `$' into a separate root so they won't match
+    %% with `#' / `+/...' subscriptions.
+    do_topic_key(Trie, ThresholdFun, 0, ?PREFIX_SPECIAL, Tokens, [], []);
 topic_key(Trie, ThresholdFun, Tokens) ->
     do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, [], []).
 
@@ -242,6 +248,11 @@ lookup_topic_key(Trie, Tokens) ->
 %% @doc Return list of keys of topics that match a given topic filter
 -spec match_topics(trie(), [level() | '+' | '#']) ->
     [msg_storage_key()].
+match_topics(Trie, [<<"$", _/bytes>> | _] = TopicFilter) ->
+    %% [MQTT-4.7.2-1]
+    %% Any topics starting with `$' should belong to a separate root, match there
+    %% instead.
+    do_match_topics(Trie, ?PREFIX_SPECIAL, [], TopicFilter);
 match_topics(Trie, TopicFilter) ->
     do_match_topics(Trie, ?PREFIX, [], TopicFilter).
 
@@ -571,24 +582,14 @@ trie_next_(Trie, State, Token) ->
 %% erlfmt-ignore
 -spec emanating(trie(), state(), edge()) -> [{edge(), state()}].
 emanating(#trie{trie = Tab}, State, ?PLUS) ->
-    case State of
-        ?PREFIX ->
-            MS = ets:fun2ms(
-                fun(#trans{key = {?PREFIX, Edge}, next = Next}) when
-                    %% Exclude topics starting with `$':
-                    is_binary(Edge) andalso binary_part(Edge, 0, 1) =/= <<"$">>; is_atom(Edge)
-                ->
-                    {Edge, Next}
-                end
-            );
-        _ ->
-            MS = ets:fun2ms(
-                fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
-                    {Edge, Next}
-                end
-            )
-    end,
-    ets:select(Tab, MS);
+    ets:select(
+        Tab,
+        ets:fun2ms(
+            fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
+                {Edge, Next}
+            end
+        )
+    );
 emanating(#trie{trie = Tab}, State, ?EOT) ->
     case ets:lookup(Tab, {State, ?EOT}) of
         [#trans{next = Next}] -> [{?EOT, Next}];

+ 13 - 6
apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl

@@ -299,25 +299,32 @@ t_replay(Config) ->
     ?assert(check(?SHARD, <<"#">>, 0, Messages)),
     ok.
 
-t_replay_sys(_Config) ->
+t_replay_special_topics(_Config) ->
+    %% Verify that topic matching rules respect [MQTT-4.7.2-1]:
+    %% The Server MUST NOT match Topic Filters starting with a wildcard character (# or +)
+    %% with Topic Names beginning with a $ character.
     {Values1, Values2} = lists:split(5, lists:seq(0, 1000, 100)),
-    STopic1 = <<"$SYS/test/1/2">>,
+    STopic1 = <<"$SPECIAL/test/1/2">>,
     ELTopic = <<"/test/">>,
     Topics1 = [<<"g/test/1">>, <<"g/test/2">>, <<"/test/">>],
     SBatch1 = [make_message(V, STopic1, bin(V)) || V <- Values1],
     Batch1 = [make_message(V, Topic, bin(V)) || Topic <- Topics1, V <- Values1],
     ok = emqx_ds:store_batch(?FUNCTION_NAME, SBatch1 ++ Batch1),
-    ?assert(check(?SHARD, <<"$SYS/test/#">>, 0, SBatch1)),
+    %% Expect special topic messages to show up only in `$SPECIAL/test/#` subscription:
+    ?assert(check(?SHARD, <<"$SPECIAL/test/#">>, 0, SBatch1)),
+    %% ...But not in an otherwise fitting wildcard subscriptions:
     ?assert(check(?SHARD, <<"+/test/#">>, 0, Batch1)),
     check(?SHARD, <<"+/test/+/+">>, 0, []),
-    STopic2 = <<"$SYS/test/3/4">>,
+    %% Publish through a lot of similarly structured topic to let LTS "learn":
+    STopic2 = <<"$SPECIAL/test/3/4">>,
     Topics2 = [emqx_utils:format("~p/test/~p", [I, I]) || I <- lists:seq(1, 40)],
     Batch2 = [make_message(V, Topic, bin(V)) || Topic <- Topics2 ++ [ELTopic], V <- Values2],
     ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch2),
     SBatch2 = [make_message(V, STopic2, bin(V)) || V <- Values2],
     ok = emqx_ds:store_batch(?FUNCTION_NAME, SBatch2),
-    ?assert(check(?SHARD, <<"$SYS/test/#">>, 0, SBatch1 ++ SBatch2)),
-    ?assert(check(?SHARD, <<"$SYS/test/+/4">>, 0, SBatch2)),
+    %% ...Then verify the same things:
+    ?assert(check(?SHARD, <<"$SPECIAL/test/#">>, 0, SBatch1 ++ SBatch2)),
+    ?assert(check(?SHARD, <<"$SPECIAL/test/+/4">>, 0, SBatch2)),
     ?assert(check(?SHARD, <<"+/test/#">>, 0, Batch1 ++ Batch2)),
     check(?SHARD, <<"+/test/+/+">>, 0, SBatch2).