Просмотр исходного кода

feat(topicidx): iterate on trie search implementation

This improves matching performance and decreases GC pressure on
synthetic workloads.
Andrew Mayorov 2 лет назад
Родитель
Сommit
cf45e80c71
3 измененных файлов с 156 добавлено и 103 удалено
  1. 141 102
      apps/emqx/src/emqx_trie_search.erl
  2. 13 0
      apps/emqx/test/emqx_topic_index_SUITE.erl
  3. 2 1
      rebar.config.erl

+ 141 - 102
apps/emqx/src/emqx_trie_search.erl

@@ -110,16 +110,6 @@
 -type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)).
 -type opts() :: [unique | return_first].
 
-%% Holds the constant values of each search.
--record(ctx, {
-    %% A function which can quickly find the immediate-next record of the given prefix
-    nextf :: nextf(),
-    %% The initial words of a topic
-    words0 :: [word()],
-    %% Return as soon as there is one match found
-    return_first :: boolean()
-}).
-
 %% @doc Make a search-key for the given topic.
 -spec make_key(emqx_types:topic(), ID) -> key(ID).
 make_key(Topic, ID) when is_binary(Topic) ->
@@ -147,29 +137,29 @@ get_topic({Filter, _ID}) when is_list(Filter) ->
 get_topic({Topic, _ID}) ->
     Topic.
 
+-compile({inline, [base/1, move_up/2, match_add/2, compare/3]}).
+
 %% Make the base-key which can be used to locate the desired search target.
 base(Prefix) ->
     {Prefix, {}}.
 
+base_init([W = <<"$", _/bytes>> | _]) ->
+    base([W]);
+base_init(_) ->
+    base([]).
+
 %% Move the search target to the key next to the given Base.
-move_up(#ctx{nextf = NextF}, Base) ->
+move_up(NextF, Base) ->
     NextF(Base).
 
-%% Add the given key to the accumulation.
-add(#ctx{return_first = true}, _Acc, Key) ->
-    throw({return_first, Key});
-add(_C, Acc, Key) ->
-    match_add(Key, Acc).
-
 %% @doc Match given topic against the index and return the first match, or `false` if
 %% no match is found.
 -spec match(emqx_types:topic(), nextf()) -> false | key(_).
 match(Topic, NextF) ->
     try search(Topic, NextF, [return_first]) of
-        [] ->
-            false
+        _ -> false
     catch
-        throw:{return_first, Res} ->
+        throw:{first, Res} ->
             Res
     end.
 
@@ -182,110 +172,159 @@ matches(Topic, NextF, Opts) ->
 %% @doc Entrypoint of the search for a given topic.
 search(Topic, NextF, Opts) ->
     Words = words(Topic),
-    Context = #ctx{
-        nextf = NextF,
-        words0 = Words,
-        return_first = proplists:get_bool(return_first, Opts)
-    },
+    Base = base_init(Words),
+    ORetFirst = proplists:get_bool(return_first, Opts),
+    OUnique = proplists:get_bool(unique, Opts),
     Acc0 =
-        case proplists:get_bool(unique, Opts) of
+        case ORetFirst of
             true ->
+                first;
+            false when OUnique ->
                 #{};
             false ->
                 []
         end,
-    Base =
-        case hd(Words) of
-            <<$$, _/binary>> ->
-                %% skip all filters starts with # or +
-                base([hd(Words)]);
-            _ ->
-                base([])
+    Matches =
+        case search_new(Words, Base, NextF, Acc0) of
+            {Cursor, Acc} ->
+                match_topics(Topic, Cursor, NextF, Acc);
+            Acc ->
+                Acc
         end,
-    {MaybeEnd, Acc1} = search_new(Context, Base, Acc0),
-    Acc = match_topics(Context, Topic, MaybeEnd, Acc1),
-    case is_map(Acc) of
+    case is_map(Matches) of
         true ->
-            maps:values(Acc);
+            maps:values(Matches);
         false ->
-            Acc
+            Matches
     end.
 
 %% The recursive entrypoint of the trie-search algorithm.
 %% Always start from the initial prefix and words.
-search_new(#ctx{words0 = Words} = C, NewBase, Acc) ->
-    case move_up(C, NewBase) of
+search_new(Words0, NewBase, NextF, Acc) ->
+    case move_up(NextF, NewBase) of
         ?END ->
-            {?END, Acc};
-        {Filter, _} = T ->
-            search_plus(C, Words, Filter, [], T, Acc)
+            Acc;
+        Cursor ->
+            search_up(Words0, Cursor, NextF, Acc)
     end.
 
-%% Try to use '+' as the next word in the prefix.
-search_plus(C, [W, X | Words], [W, X | Filter], RPrefix, T, Acc) ->
-    %% Directly append the current word to the matching prefix (RPrefix).
-    %% Micro optimization: try not to call the next clause because
-    %% it is not a continuation.
-    search_plus(C, [X | Words], [X | Filter], [W | RPrefix], T, Acc);
-search_plus(C, [W | Words], ['+' | _] = Filter, RPrefix, T, Acc) ->
-    case search_up(C, '+', Words, Filter, RPrefix, T, Acc) of
-        {T, Acc1} ->
-            search_up(C, W, Words, Filter, RPrefix, T, Acc1);
-        TargetMoved ->
-            TargetMoved
-    end;
-search_plus(C, [W | Words], Filter, RPrefix, T, Acc) ->
-    %% not a plus
-    search_up(C, W, Words, Filter, RPrefix, T, Acc).
-
 %% Search to the bigger end of ordered collection of topics and topic-filters.
-search_up(C, Word, Words, Filter, RPrefix, T, Acc) ->
-    case compare(Word, Filter, Words) of
-        {match, full} ->
-            search_new(C, T, add(C, Acc, T));
-        {match, prefix} ->
-            search_new(C, T, Acc);
+search_up(Words, {Filter, _} = Cursor, NextF, Acc) ->
+    case compare(Filter, Words, false) of
+        match_full ->
+            search_new(Words, Cursor, NextF, match_add(Cursor, Acc));
+        match_prefix ->
+            search_new(Words, Cursor, NextF, Acc);
         lower ->
-            {T, Acc};
-        higher ->
-            NewBase = base(lists:reverse([Word | RPrefix])),
-            search_new(C, NewBase, Acc);
-        shorter ->
-            search_plus(C, Words, tl(Filter), [Word | RPrefix], T, Acc)
+            {Cursor, Acc};
+        [SeekWord | FilterTail] ->
+            % NOTE
+            % This is a seek instruction.
+            % If we visualize the `Filter` as `FilterHead ++ [_] ++ FilterTail`, we need to
+            % seek to `FilterHead ++ [SeekWord]`. It carries the `FilterTail` because it's
+            % much cheaper to return it from `compare/3` than anything more usable.
+            NewBase = base(seek(SeekWord, Filter, FilterTail)),
+            search_new(Words, NewBase, NextF, Acc)
     end.
 
-%% Compare prefix word then the next words in suffix against the search-target
-%% topic or topic-filter.
-compare(_, NotFilter, _) when is_binary(NotFilter) ->
+seek(SeekWord, [_ | FilterTail], FilterTail) ->
+    [SeekWord];
+seek(SeekWord, [FilterWord | Rest], FilterTail) ->
+    [FilterWord | seek(SeekWord, Rest, FilterTail)].
+
+compare(NotFilter, _, _) when is_binary(NotFilter) ->
     lower;
-compare(H, [H | Filter], Words) ->
-    compare(Filter, Words);
-compare(_, ['#'], _Words) ->
-    {match, full};
-compare(H1, [H2 | _T2], _Words) when H1 < H2 ->
+compare([], [], _) ->
+    % NOTE
+    %  Topic: a/b/c/d
+    % Filter: a/+/+/d
+    % We matched the topic to a topic filter exactly (possibly with pluses).
+    % We include it in the result set, and now need to try next entry in the table.
+    % Closest possible next entries that we must not miss:
+    % * a/+/+/d (same topic but a different ID)
+    % * a/+/+/d/# (also a match)
+    match_full;
+compare([], _Words, _) ->
+    % NOTE
+    %  Topic: a/b/c/d
+    % Filter: a/+/c
+    % We found out that a topic filter is a prefix of the topic (possibly with pluses).
+    % We discard it, and now need to try next entry in the table.
+    % Closest possible next entries that we must not miss:
+    % * a/+/c/# (which is a match)
+    % * a/+/c/+ (also a match)
+    %
+    % TODO
+    % We might probably instead seek to a/+/c/# right away.
+    match_prefix;
+compare(['#'], _Words, _) ->
+    % NOTE
+    %  Topic: a/b/c/d
+    % Filter: a/+/+/d/#
+    % We matched the topic to a topic filter with wildcard (possibly with pluses).
+    % We include it in the result set, and now need to try next entry in the table.
+    % Closest possible next entries that we must not miss:
+    % * a/+/+/d/# (same topic but a different ID)
+    match_full;
+compare(['+' | TF], [HW | TW], _PrevBacktrack) ->
+    % NOTE
+    % We need to keep backtrack point each time we encounter a plus. To safely skip over
+    % parts of the search space, we may need last backtrack point when recursion terminates.
+    % See next clauses for examples.
+    compare(TF, TW, [HW | TF]);
+compare([HW | TF], [HW | TW], Backtrack) ->
+    % NOTE
+    % Skip over the same word in both topic and filter, keeping the last backtrack point.
+    compare(TF, TW, Backtrack);
+compare([HF | _], [HW | _], false) when HF > HW ->
+    % NOTE
+    %  Topic: a/b/c/d
+    % Filter: a/b/c/e/1
+    % The topic is lower than a topic filter. There's nowhere to backtrackto, we're out of
+    % the search space. We should stop the search.
     lower;
-compare(_H, [_ | _], _Words) ->
-    higher.
-
-%% Now compare the filter suffix and the topic suffix.
-compare([], []) ->
-    {match, full};
-compare([], _Words) ->
-    {match, prefix};
-compare(['#'], _Words) ->
-    {match, full};
-compare([_ | _], []) ->
+compare([HF | _], [HW | _], Backtrack) when HF > HW ->
+    % NOTE
+    %  Topic: a/b/c/d
+    % Filter: a/+/+/e/1
+    % The topic is lower than a topic filter. There was a plus, last time at the 3rd level,
+    % we have a backtrack point to seek to:
+    % Seek: [c | e/1]
+    % We need to skip over part of search space, and seek to the next possible match:
+    % Next: a/+/c
+    Backtrack;
+compare([_ | _], [], false) ->
+    % NOTE
+    %  Topic: a/b/c/d
+    % Filter: a/b/c/d/1
+    % The topic is lower than a topic filter. (since it's shorter). There's nowhere to
+    % backtrack to, we're out of the search space. We should stop the search.
     lower;
-compare([_ | _], _Words) ->
-    %% cannot know if it's a match, lower, or higher,
-    %% must search with a longer prefix.
-    shorter.
+compare([_ | _], [], Backtrack) ->
+    % NOTE
+    %  Topic: a/b/c/d
+    % Filter: a/+/c/d/1
+    % The topic is lower than a topic filter. There was a plus, last and only time at the
+    % 3rd level, we have a backtrack point:
+    % Seek: [b | c/d/1]
+    % Next: a/b
+    Backtrack;
+compare([_HF | TF], [HW | _], _) ->
+    % NOTE
+    %  Topic: a/b/c/d
+    % Filter: a/+/+/0/1/2
+    % Topic is higher than the filter, we need to skip over to the next possible filter.
+    % Seek: [d | 0/1/2]
+    % Next: a/+/+/d
+    [HW | TF].
 
 match_add(K = {_Filter, ID}, Acc = #{}) ->
     % NOTE: ensuring uniqueness by record ID
     Acc#{ID => K};
-match_add(K, Acc) ->
-    [K | Acc].
+match_add(K, Acc) when is_list(Acc) ->
+    [K | Acc];
+match_add(K, first) ->
+    throw({first, K}).
 
 -spec words(emqx_types:topic()) -> [word()].
 words(Topic) when is_binary(Topic) ->
@@ -301,12 +340,12 @@ word(<<"#">>) -> '#';
 word(Bin) -> Bin.
 
 %% match non-wildcard topics
-match_topics(#ctx{nextf = NextF} = C, Topic, {Topic, _} = Key, Acc) ->
+match_topics(Topic, {Topic, _} = Key, NextF, Acc) ->
     %% found a topic match
-    match_topics(C, Topic, NextF(Key), add(C, Acc, Key));
-match_topics(#ctx{nextf = NextF} = C, Topic, {F, _}, Acc) when F < Topic ->
+    match_topics(Topic, NextF(Key), NextF, match_add(Key, Acc));
+match_topics(Topic, {F, _}, NextF, Acc) when F < Topic ->
     %% the last key is a filter, try jump to the topic
-    match_topics(C, Topic, NextF(base(Topic)), Acc);
-match_topics(_C, _Topic, _Key, Acc) ->
+    match_topics(Topic, NextF(base(Topic)), NextF, Acc);
+match_topics(_Topic, _Key, _NextF, Acc) ->
     %% gone pass the topic
     Acc.

+ 13 - 0
apps/emqx/test/emqx_topic_index_SUITE.erl

@@ -256,6 +256,19 @@ t_match_wildcard_edge_cases(Config) ->
     end,
     lists:foreach(F, Datasets).
 
+t_prop_edgecase(Config) ->
+    M = get_module(Config),
+    Tab = M:new(),
+    Topic = <<"01/01">>,
+    Filters = [
+        {1, <<>>},
+        {2, <<"+/01">>},
+        {3, <<>>},
+        {4, <<"+/+/01">>}
+    ],
+    _ = [M:insert(F, N, <<>>, Tab) || {N, F} <- Filters],
+    ?assertMatch([2], [id(X) || X <- matches(M, Topic, Tab, [unique])]).
+
 t_prop_matches(Config) ->
     M = get_module(Config),
     ?assert(

+ 2 - 1
rebar.config.erl

@@ -190,7 +190,8 @@ test_deps() ->
         {meck, "0.9.2"},
         {proper, "1.4.0"},
         {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.5"}}},
-        {erl_csv, "0.2.0"}
+        {erl_csv, "0.2.0"},
+        {eministat, "0.10.1"}
     ].
 
 common_compile_opts() ->