Browse Source

fix(emqx_trie): performance issue when many levels

Zaiming Shi 4 years ago
parent
commit
3c03047c9f
3 changed files with 56 additions and 21 deletions
  1. 2 0
      src/emqx.appup.src
  2. 30 20
      src/emqx_trie.erl
  3. 24 1
      test/emqx_trie_SUITE.erl

+ 2 - 0
src/emqx.appup.src

@@ -5,6 +5,7 @@
      {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
      {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_frame, brutal_purge, soft_purge, []},
      {load_module, emqx_frame, brutal_purge, soft_purge, []},
+     {load_module, emqx_trie, brutal_purge, soft_purge, []},
      {load_module, emqx_metrics, brutal_purge, soft_purge, []},
      {load_module, emqx_metrics, brutal_purge, soft_purge, []},
      {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}
      {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}
    ]},
    ]},
@@ -15,6 +16,7 @@
      {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
      {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_frame, brutal_purge, soft_purge, []},
      {load_module, emqx_frame, brutal_purge, soft_purge, []},
+     {load_module, emqx_trie, brutal_purge, soft_purge, []},
      %% Just load the module. We don't need to change the 'messages.retained'
      %% Just load the module. We don't need to change the 'messages.retained'
      %% and 'messages.retained' counter type.
      %% and 'messages.retained' counter type.
      {load_module, emqx_metrics, brutal_purge, soft_purge, []}
      {load_module, emqx_metrics, brutal_purge, soft_purge, []}

+ 30 - 20
src/emqx_trie.erl

@@ -219,15 +219,22 @@ do_match(Words) ->
     do_match(Words, empty).
     do_match(Words, empty).
 
 
 do_match(Words, Prefix) ->
 do_match(Words, Prefix) ->
-    match(is_compact(), Words, Prefix, []).
+    case is_compact() of
+        true -> match_compact(Words, Prefix, []);
+        false -> match_no_compact(Words, Prefix, [])
+    end.
 
 
-match(_IsCompact, [], Topic, Acc) ->
+match_no_compact([], Topic, Acc) ->
     'match_#'(Topic) ++ %% try match foo/bar/#
     'match_#'(Topic) ++ %% try match foo/bar/#
     lookup_topic(Topic) ++ %% try match foo/bar
     lookup_topic(Topic) ++ %% try match foo/bar
     Acc;
     Acc;
-match(IsCompact, [Word | Words], Prefix, Acc0) ->
-    case {has_prefix(Prefix), IsCompact} of
-        {false, false} ->
+match_no_compact([Word | Words], Prefix, Acc0) ->
+    case has_prefix(Prefix) of
+        true ->
+            Acc1 = 'match_#'(Prefix) ++ Acc0,
+            Acc = match_no_compact(Words, join(Prefix, '+'), Acc1),
+            match_no_compact(Words, join(Prefix, Word), Acc);
+        false ->
             %% non-compact paths in database
             %% non-compact paths in database
             %% if there is no prefix matches the current topic prefix
             %% if there is no prefix matches the current topic prefix
             %% we can simpliy return from here
             %% we can simpliy return from here
@@ -240,21 +247,24 @@ match(IsCompact, [Word | Words], Prefix, Acc0) ->
             %% then at the second level, we lookup prefix a/x,
             %% then at the second level, we lookup prefix a/x,
             %% no such prefix to be found, meaning there is no point
             %% no such prefix to be found, meaning there is no point
             %% searching for 'a/x/y', 'a/x/+' or 'a/x/#'
             %% searching for 'a/x/y', 'a/x/+' or 'a/x/#'
-            Acc0;
-        _ ->
-            %% compact paths in database
-            %% we have to enumerate all possible prefixes
-            %% e.g. a/+/b/# results with below entries in database
-            %%   - a/+
-            %%   - a/+/b/#
-            %% when matching a/x/y, we need to enumerate
-            %%   - a
-            %%   - a/x
-            %%   - a/x/y
-            %% *with '+', '#' replaced at each level
-            Acc1 = 'match_#'(Prefix) ++ Acc0,
-            Acc = match(IsCompact, Words, join(Prefix, '+'), Acc1),
-            match(IsCompact, Words, join(Prefix, Word), Acc)
+            Acc0
+    end.
+
+match_compact([], Topic, Acc) ->
+    'match_#'(Topic) ++ %% try match foo/bar/#
+    lookup_topic(Topic) ++ %% try match foo/bar
+    Acc;
+match_compact([Word | Words], Prefix, Acc0) ->
+    Acc1 = 'match_#'(Prefix) ++ Acc0,
+    Acc = match_compact(Words, join(Prefix, Word), Acc1),
+    WildcardPrefix = join(Prefix, '+'),
+    %% go deeper to match current_prefix/+ only when:
+    %% 1. current word is the last
+    %% OR
+    %% 2. there is a prefix = 'current_prefix/+'
+    case Words =:= [] orelse has_prefix(WildcardPrefix) of
+        true -> match_compact(Words, WildcardPrefix, Acc);
+        false -> Acc
     end.
     end.
 
 
 'match_#'(Prefix) ->
 'match_#'(Prefix) ->

+ 24 - 1
test/emqx_trie_SUITE.erl

@@ -105,7 +105,10 @@ t_match3(_) ->
     Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>],
     Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>],
     trans(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end),
     trans(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end),
     Matched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"a/b/c">>]),
     Matched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"a/b/c">>]),
-    ?assertEqual(4, length(Matched)),
+    case length(Matched) of
+        4 -> ok;
+        _ -> error({unexpected, Matched})
+    end,
     SysMatched = emqx_trie:match(<<"$SYS/a/b/c">>),
     SysMatched = emqx_trie:match(<<"$SYS/a/b/c">>),
     ?assertEqual([<<"$SYS/#">>], SysMatched).
     ?assertEqual([<<"$SYS/#">>], SysMatched).
 
 
@@ -114,6 +117,26 @@ t_match4(_) ->
     trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end),
     trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end),
     ?assertEqual([<<"/#">>, <<"/+/a/b/c">>], lists:sort(emqx_trie:match(<<"/0/a/b/c">>))).
     ?assertEqual([<<"/#">>, <<"/+/a/b/c">>], lists:sort(emqx_trie:match(<<"/0/a/b/c">>))).
 
 
+t_match5(_) ->
+    T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
+    Topics = [<<"#">>, <<T/binary, "/#">>, <<T/binary, "/+">>],
+    trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end),
+    ?assertEqual([<<"#">>, <<T/binary, "/#">>], lists:sort(emqx_trie:match(T))),
+    ?assertEqual([<<"#">>, <<T/binary, "/#">>, <<T/binary, "/+">>],
+                 lists:sort(emqx_trie:match(<<T/binary, "/1">>))).
+
+t_match6(_) ->
+    T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
+    W = <<"+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/#">>,
+    trans(fun() -> emqx_trie:insert(W) end),
+    ?assertEqual([W], emqx_trie:match(T)).
+
+t_match7(_) ->
+    T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
+    W = <<"a/+/c/+/e/+/g/+/i/+/k/+/m/+/o/+/q/+/s/+/u/+/w/+/y/+/#">>,
+    trans(fun() -> emqx_trie:insert(W) end),
+    ?assertEqual([W], emqx_trie:match(T)).
+
 t_empty(_) ->
 t_empty(_) ->
     ?assert(?TRIE:empty()),
     ?assert(?TRIE:empty()),
     trans(fun ?TRIE:insert/1, [<<"topic/x/#">>]),
     trans(fun ?TRIE:insert/1, [<<"topic/x/#">>]),