Просмотр исходного кода

perf(ruleeng): employ `emqx_topic_index` to speed up topic matching

Andrew Mayorov 2 лет назад
Родитель
Сommit
47dfba4341

+ 1 - 0
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -109,6 +109,7 @@
 
 %% Tables
 -define(RULE_TAB, emqx_rule_engine).
+-define(RULE_TOPIC_INDEX, emqx_rule_engine_topic_index).
 
 %% Allowed sql function provider modules
 -define(DEFAULT_SQL_FUNC_PROVIDER, emqx_rule_funcs).

+ 47 - 21
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -176,7 +176,7 @@ create_rule(Params) ->
 
 create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) ->
     case get_rule(RuleId) of
-        not_found -> parse_and_insert(Params, CreatedAt);
+        not_found -> with_parsed_rule(Params, CreatedAt, fun insert_rule/1);
         {ok, _} -> {error, already_exists}
     end.
 
@@ -185,18 +185,27 @@ update_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
     case get_rule(RuleId) of
         not_found ->
             {error, not_found};
-        {ok, #{created_at := CreatedAt}} ->
-            parse_and_insert(Params, CreatedAt)
+        {ok, RulePrev = #{created_at := CreatedAt}} ->
+            with_parsed_rule(Params, CreatedAt, fun(Rule) -> update_rule(Rule, RulePrev) end)
     end.
 
 -spec delete_rule(RuleId :: rule_id()) -> ok.
 delete_rule(RuleId) when is_binary(RuleId) ->
-    gen_server:call(?RULE_ENGINE, {delete_rule, RuleId}, ?T_CALL).
+    case get_rule(RuleId) of
+        not_found ->
+            ok;
+        {ok, Rule} ->
+            gen_server:call(?RULE_ENGINE, {delete_rule, Rule}, ?T_CALL)
+    end.
 
 -spec insert_rule(Rule :: rule()) -> ok.
 insert_rule(Rule) ->
     gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL).
 
+-spec update_rule(Rule :: rule(), RulePrev :: rule()) -> ok.
+update_rule(Rule, RulePrev) ->
+    gen_server:call(?RULE_ENGINE, {update_rule, Rule, RulePrev}, ?T_CALL).
+
 %%----------------------------------------------------------------------------------------
 %% Rule Management
 %%----------------------------------------------------------------------------------------
@@ -216,9 +225,8 @@ get_rules_ordered_by_ts() ->
 -spec get_rules_for_topic(Topic :: binary()) -> [rule()].
 get_rules_for_topic(Topic) ->
     [
-        Rule
-     || Rule = #{from := From} <- get_rules(),
-        emqx_topic:match_any(Topic, From)
+        emqx_topic_index:get_record(M, ?RULE_TOPIC_INDEX)
+     || M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique])
     ].
 
 -spec get_rules_with_same_event(Topic :: binary()) -> [rule()].
@@ -411,10 +419,17 @@ init([]) ->
     {ok, #{}}.
 
 handle_call({insert_rule, Rule}, _From, State) ->
-    do_insert_rule(Rule),
+    ok = do_insert_rule(Rule),
+    ok = do_update_rule_index(Rule),
+    {reply, ok, State};
+handle_call({update_rule, Rule, RulePrev}, _From, State) ->
+    ok = do_delete_rule_index(RulePrev),
+    ok = do_insert_rule(Rule),
+    ok = do_update_rule_index(Rule),
     {reply, ok, State};
 handle_call({delete_rule, Rule}, _From, State) ->
-    do_delete_rule(Rule),
+    ok = do_delete_rule_index(Rule),
+    ok = do_delete_rule(Rule),
     {reply, ok, State};
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", request => Req}),
@@ -438,7 +453,7 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Functions
 %%----------------------------------------------------------------------------------------
 
-parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) ->
+with_parsed_rule(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt, Fun) ->
     case emqx_rule_sqlparser:parse(Sql) of
         {ok, Select} ->
             Rule = #{
@@ -459,7 +474,7 @@ parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, Creat
                 conditions => emqx_rule_sqlparser:select_where(Select)
                 %% -- calculated fields end
             },
-            ok = insert_rule(Rule),
+            ok = Fun(Rule),
             {ok, Rule};
         {error, Reason} ->
             {error, Reason}
@@ -471,16 +486,27 @@ do_insert_rule(#{id := Id} = Rule) ->
     true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}),
     ok.
 
-do_delete_rule(RuleId) ->
-    case get_rule(RuleId) of
-        {ok, Rule} ->
-            ok = unload_hooks_for_rule(Rule),
-            ok = clear_metrics_for_rule(RuleId),
-            true = ets:delete(?RULE_TAB, RuleId),
-            ok;
-        not_found ->
-            ok
-    end.
+do_delete_rule(#{id := Id} = Rule) ->
+    ok = unload_hooks_for_rule(Rule),
+    ok = clear_metrics_for_rule(Id),
+    true = ets:delete(?RULE_TAB, Id),
+    ok.
+
+do_update_rule_index(#{id := Id, from := From} = Rule) ->
+    ok = lists:foreach(
+        fun(Topic) ->
+            true = emqx_topic_index:insert(Topic, Id, Rule, ?RULE_TOPIC_INDEX)
+        end,
+        From
+    ).
+
+do_delete_rule_index(#{id := Id, from := From}) ->
+    ok = lists:foreach(
+        fun(Topic) ->
+            true = emqx_topic_index:delete(Topic, Id, ?RULE_TOPIC_INDEX)
+        end,
+        From
+    ).
 
 parse_actions(Actions) ->
     [do_parse_action(Act) || Act <- Actions].

+ 1 - 0
apps/emqx_rule_engine/src/emqx_rule_engine_app.erl

@@ -26,6 +26,7 @@
 
 start(_Type, _Args) ->
     _ = ets:new(?RULE_TAB, [named_table, public, ordered_set, {read_concurrency, true}]),
+    _ = ets:new(?RULE_TOPIC_INDEX, [named_table, public, ordered_set, {read_concurrency, true}]),
     ok = emqx_rule_events:reload(),
     SupRet = emqx_rule_engine_sup:start_link(),
     ok = emqx_rule_engine:load_rules(),