Procházet zdrojové kódy

fix(ds-lts): respect special topic matching rules in LTS-based layouts

There are a couple of advantages doing it on such a low level:
1. It's possible to avoid scanning whole streams.
2. Topic is already matched against (compressed) topic filter on the
   layout level anyway, so it would be wasteful to add one more step of
   matching and filtering on higher levels.
Andrew Mayorov před 1 rokem
rodič
revize
b738dc917a

+ 18 - 8
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -571,14 +571,24 @@ trie_next_(Trie, State, Token) ->
 %% erlfmt-ignore
 %% erlfmt-ignore
 -spec emanating(trie(), state(), edge()) -> [{edge(), state()}].
 -spec emanating(trie(), state(), edge()) -> [{edge(), state()}].
 emanating(#trie{trie = Tab}, State, ?PLUS) ->
 emanating(#trie{trie = Tab}, State, ?PLUS) ->
-    ets:select(
-        Tab,
-        ets:fun2ms(
-            fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
-                {Edge, Next}
-            end
-        )
-    );
+    case State of
+        ?PREFIX ->
+            MS = ets:fun2ms(
+                fun(#trans{key = {?PREFIX, Edge}, next = Next}) when
+                    %% Exclude topics starting with `$':
+                    is_binary(Edge) andalso binary_part(Edge, 0, 1) =/= <<"$">>; is_atom(Edge)
+                ->
+                    {Edge, Next}
+                end
+            );
+        _ ->
+            MS = ets:fun2ms(
+                fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
+                    {Edge, Next}
+                end
+            )
+    end,
+    ets:select(Tab, MS);
 emanating(#trie{trie = Tab}, State, ?EOT) ->
 emanating(#trie{trie = Tab}, State, ?EOT) ->
     case ets:lookup(Tab, {State, ?EOT}) of
     case ets:lookup(Tab, {State, ?EOT}) of
         [#trans{next = Next}] -> [{?EOT, Next}];
         [#trans{next = Next}] -> [{?EOT, Next}];

+ 31 - 2
apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl

@@ -31,6 +31,8 @@
 
 
 -define(SHARD, shard(?FUNCTION_NAME)).
 -define(SHARD, shard(?FUNCTION_NAME)).
 
 
+-define(LTS_THRESHOLD, {simple, {20, 10}}).
+
 -define(DB_CONFIG(CONFIG), #{
 -define(DB_CONFIG(CONFIG), #{
     backend => builtin_local,
     backend => builtin_local,
     storage => ?config(layout, CONFIG),
     storage => ?config(layout, CONFIG),
@@ -47,9 +49,14 @@ init_per_group(Group, Config) ->
     LayoutConf =
     LayoutConf =
         case Group of
         case Group of
             reference ->
             reference ->
-                {emqx_ds_storage_reference, #{}};
+                {emqx_ds_storage_reference, #{
+                    lts_threshold_spec => ?LTS_THRESHOLD
+                }};
             skipstream_lts ->
             skipstream_lts ->
-                {emqx_ds_storage_skipstream_lts, #{with_guid => true}};
+                {emqx_ds_storage_skipstream_lts, #{
+                    with_guid => true,
+                    lts_threshold_spec => ?LTS_THRESHOLD
+                }};
             bitfield_lts ->
             bitfield_lts ->
                 {emqx_ds_storage_bitfield_lts, #{}}
                 {emqx_ds_storage_bitfield_lts, #{}}
         end,
         end,
@@ -292,6 +299,28 @@ t_replay(Config) ->
     ?assert(check(?SHARD, <<"#">>, 0, Messages)),
     ?assert(check(?SHARD, <<"#">>, 0, Messages)),
     ok.
     ok.
 
 
+t_replay_sys(_Config) ->
+    {Values1, Values2} = lists:split(5, lists:seq(0, 1000, 100)),
+    STopic1 = <<"$SYS/test/1/2">>,
+    ELTopic = <<"/test/">>,
+    Topics1 = [<<"g/test/1">>, <<"g/test/2">>, <<"/test/">>],
+    SBatch1 = [make_message(V, STopic1, bin(V)) || V <- Values1],
+    Batch1 = [make_message(V, Topic, bin(V)) || Topic <- Topics1, V <- Values1],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, SBatch1 ++ Batch1),
+    ?assert(check(?SHARD, <<"$SYS/test/#">>, 0, SBatch1)),
+    ?assert(check(?SHARD, <<"+/test/#">>, 0, Batch1)),
+    check(?SHARD, <<"+/test/+/+">>, 0, []),
+    STopic2 = <<"$SYS/test/3/4">>,
+    Topics2 = [emqx_utils:format("~p/test/~p", [I, I]) || I <- lists:seq(1, 40)],
+    Batch2 = [make_message(V, Topic, bin(V)) || Topic <- Topics2 ++ [ELTopic], V <- Values2],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch2),
+    SBatch2 = [make_message(V, STopic2, bin(V)) || V <- Values2],
+    ok = emqx_ds:store_batch(?FUNCTION_NAME, SBatch2),
+    ?assert(check(?SHARD, <<"$SYS/test/#">>, 0, SBatch1 ++ SBatch2)),
+    ?assert(check(?SHARD, <<"$SYS/test/+/4">>, 0, SBatch2)),
+    ?assert(check(?SHARD, <<"+/test/#">>, 0, Batch1 ++ Batch2)),
+    check(?SHARD, <<"+/test/+/+">>, 0, SBatch2).
+
 %% This testcase verifies poll functionality that doesn't involve events:
 %% This testcase verifies poll functionality that doesn't involve events:
 t_poll(Config) ->
 t_poll(Config) ->
     ?check_trace(
     ?check_trace(