Просмотр исходного кода

Add hooks only when creating the rules (#4160)

* refactor(rules): add hook only when creating rules

* fix(rule): update hooks after application restarted

* fix(rule): remove the extra guard
Shawn 5 лет назад
Родитель
Сommit
7778cd8623

+ 2 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -406,8 +406,9 @@ refresh_rules() ->
         end
     end, emqx_rule_registry:get_rules()).
 
-refresh_rule(#rule{id = RuleId, actions = Actions}) ->
+refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) ->
     ok = emqx_rule_metrics:create_rule_metrics(RuleId),
+    lists:foreach(fun emqx_rule_events:load/1, Topics),
     refresh_actions(Actions).
 
 -spec(refresh_resource_status() -> ok).

+ 1 - 8
apps/emqx_rule_engine/src/emqx_rule_engine_app.erl

@@ -24,8 +24,6 @@
 
 -export([stop/1]).
 
--define(APP, emqx_rule_engine).
-
 start(_Type, _Args) ->
     {ok, Sup} = emqx_rule_engine_sup:start_link(),
     _ = emqx_rule_engine_sup:start_locker(),
@@ -33,13 +31,8 @@ start(_Type, _Args) ->
     ok = emqx_rule_engine:refresh_resources(),
     ok = emqx_rule_engine:refresh_rules(),
     ok = emqx_rule_engine_cli:load(),
-    ok = emqx_rule_events:load(env()),
     {ok, Sup}.
 
 stop(_State) ->
-    ok = emqx_rule_events:unload(env()),
+    ok = emqx_rule_events:unload(),
     ok = emqx_rule_engine_cli:unload().
-
-env() ->
-    application:get_all_env(?APP)
-    .

+ 22 - 12
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -16,12 +16,14 @@
 
 -module(emqx_rule_events).
 
+-include("rule_engine.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 
 -logger_header("[RuleEvents]").
 
 -export([ load/1
+        , unload/0
         , unload/1
         , event_name/1
         , eventmsg_publish/1
@@ -60,16 +62,22 @@
         ]).
 -endif.
 
-load(Env) ->
-    lists:foreach(
-      fun(HookPoint) ->
-              ok = emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [hook_conf(HookPoint, Env)]})
-      end, ?SUPPORTED_HOOK).
+load(Topic) ->
+    HookPoint = event_name(Topic),
+    emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint),
+        [hook_conf(HookPoint, env())]}).
 
-unload(_Env) ->
-    [emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
-     || HookPoint <- ?SUPPORTED_HOOK],
-    ok.
+unload() ->
+    lists:foreach(fun(HookPoint) ->
+            emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
+        end, ?SUPPORTED_HOOK).
+
+unload(Topic) ->
+    HookPoint = event_name(Topic),
+    emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}).
+
+env() ->
+    application:get_all_env(?APP).
 
 %%--------------------------------------------------------------------
 %% Callbacks
@@ -574,17 +582,19 @@ reason(_) -> internal_error.
 
 ntoa(undefined) -> undefined;
 ntoa({IpAddr, Port}) ->
-    iolist_to_binary([inet:ntoa(IpAddr),":",integer_to_list(Port)]);
+    iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
 ntoa(IpAddr) ->
     iolist_to_binary(inet:ntoa(IpAddr)).
 
 event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected';
 event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected';
 event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed';
-event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed';
+event_name(<<"$events/session_unsubscribed", _/binary>>) ->
+    'session.unsubscribed';
 event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
 event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
-event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'.
+event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
+event_name(_) -> 'message.publish'.
 
 event_topic('client.connected') -> <<"$events/client_connected">>;
 event_topic('client.disconnected') -> <<"$events/client_disconnected">>;

+ 34 - 6
apps/emqx_rule_engine/src/emqx_rule_registry.erl

@@ -26,6 +26,7 @@
 %% Rule Management
 -export([ get_rules/0
         , get_rules_for/1
+        , get_rules_with_same_event/1
         , get_rule/1
         , add_rule/1
         , add_rules/1
@@ -91,6 +92,8 @@
         , {?RES_TAB, 'resources.count', 'resources.max'}
         ]).
 
+-define(T_CALL, 10000).
+
 %%------------------------------------------------------------------------------
 %% Mnesia bootstrap
 %%------------------------------------------------------------------------------
@@ -170,6 +173,15 @@ get_rules_for(Topic) ->
     [Rule || Rule = #rule{for = For} <- get_rules(),
              emqx_rule_utils:can_topic_match_oneof(Topic, For)].
 
+-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())).
+get_rules_with_same_event(Topic) ->
+    EventName = emqx_rule_events:event_name(Topic),
+    [Rule || Rule = #rule{for = For} <- get_rules(),
+             lists:any(fun(T) -> is_of_event_name(EventName, T) end, For)].
+
+is_of_event_name(EventName, Topic) ->
+    EventName =:= emqx_rule_events:event_name(Topic).
+
 -spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found).
 get_rule(Id) ->
     case mnesia:dirty_read(?RULE_TAB, Id) of
@@ -179,22 +191,23 @@ get_rule(Id) ->
 
 -spec(add_rule(emqx_rule_engine:rule()) -> ok).
 add_rule(Rule) when is_record(Rule, rule) ->
-    trans(fun insert_rule/1, [Rule]).
+    add_rules([Rule]).
 
 -spec(add_rules(list(emqx_rule_engine:rule())) -> ok).
 add_rules(Rules) ->
-    trans(fun lists:foreach/2, [fun insert_rule/1, Rules]).
+    gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL).
 
 -spec(remove_rule(emqx_rule_engine:rule() | rule_id()) -> ok).
 remove_rule(RuleOrId) ->
-    trans(fun delete_rule/1, [RuleOrId]).
+    remove_rules([RuleOrId]).
 
 -spec(remove_rules(list(emqx_rule_engine:rule()) | list(rule_id())) -> ok).
 remove_rules(Rules) ->
-    trans(fun lists:foreach/2, [fun delete_rule/1, Rules]).
+    gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL).
 
 %% @private
-insert_rule(Rule = #rule{}) ->
+insert_rule(Rule = #rule{for = Topics}) ->
+    lists:foreach(fun emqx_rule_events:load/1, Topics),
     mnesia:write(?RULE_TAB, Rule, write).
 
 %% @private
@@ -203,7 +216,14 @@ delete_rule(RuleId) when is_binary(RuleId) ->
         {ok, Rule} -> delete_rule(Rule);
         not_found -> ok
     end;
-delete_rule(Rule = #rule{}) when is_record(Rule, rule) ->
+delete_rule(Rule = #rule{id = Id, for = Topics}) ->
+    lists:foreach(fun(Topic) ->
+            case get_rules_with_same_event(Topic) of
+                [#rule{id = Id}] -> %% we are now deleting the last rule
+                    emqx_rule_events:unload(Topic);
+                _ -> ok
+            end
+        end, Topics),
     mnesia:delete_object(?RULE_TAB, Rule, write).
 
 %%------------------------------------------------------------------------------
@@ -391,6 +411,14 @@ init([]) ->
         {read_concurrency, true}]),
     {ok, #{}}.
 
+handle_call({add_rules, Rules}, _From, State) ->
+    trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
+    {reply, ok, State};
+
+handle_call({remove_rules, Rules}, _From, State) ->
+    trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
+    {reply, ok, State};
+
 handle_call(Req, _From, State) ->
     ?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]),
     {reply, ignored, State}.

+ 34 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -79,6 +79,7 @@ groups() ->
        t_update_rule,
        t_get_rules_for,
        t_get_rules_for_2,
+       t_get_rules_with_same_event,
        t_add_get_remove_action,
        t_add_get_remove_actions,
        t_remove_actions_of,
@@ -714,6 +715,39 @@ t_get_rules_for_2(_Config) ->
     ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]),
     ok.
 
+t_get_rules_with_same_event(_Config) ->
+    PubT = <<"simple/1">>,
+    PubN = length(emqx_rule_registry:get_rules_with_same_event(PubT)),
+    ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>)),
+    ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>)),
+    ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>)),
+    ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>)),
+    ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>)),
+    ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>)),
+    ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>)),
+    ok = emqx_rule_registry:add_rules(
+            [make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
+             make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]),
+             make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>, [<<"$events/client_connected">>]),
+             make_simple_rule(<<"r4">>, <<"select * from \"$events/client_disconnected\"">>, [<<"$events/client_disconnected">>]),
+             make_simple_rule(<<"r5">>, <<"select * from \"$events/session_subscribed\"">>, [<<"$events/session_subscribed">>]),
+             make_simple_rule(<<"r6">>, <<"select * from \"$events/session_unsubscribed\"">>, [<<"$events/session_unsubscribed">>]),
+             make_simple_rule(<<"r7">>, <<"select * from \"$events/message_delivered\"">>, [<<"$events/message_delivered">>]),
+             make_simple_rule(<<"r8">>, <<"select * from \"$events/message_acked\"">>, [<<"$events/message_acked">>]),
+             make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>, [<<"$events/message_dropped">>]),
+             make_simple_rule(<<"r10">>, <<"select * from \"t/1, $events/session_subscribed, $events/client_connected\"">>, [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>])
+             ]),
+    ?assertEqual(PubN + 3, length(emqx_rule_registry:get_rules_with_same_event(PubT))),
+    ?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>))),
+    ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>))),
+    ?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>))),
+    ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>))),
+    ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>))),
+    ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>))),
+    ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>))),
+    ok = emqx_rule_registry:remove_rules([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]),
+    ok.
+
 t_add_get_remove_action(_Config) ->
     ActionName0 = 'action-debug-0',
     Action0 = make_simple_action(ActionName0),