Parcourir la source

fix(rule_engine): load hooks to all the nodes in cluster

Shawn il y a 4 ans
Parent
commit
b5edba7729

+ 16 - 0
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -154,6 +154,22 @@
             end
         end()).
 
+-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
+
+-define(CLUSTER_CALL(Func, Args, ResParttern),
+    fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of
+        {ResL, []} ->
+            case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
+                [] -> ResL;
+                ErrL ->
+                    ?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
+                    throw({Func, ErrL})
+            end;
+        {ResL, BadNodes} ->
+            ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
+            throw({Func, {failed_on_nodes, BadNodes}})
+   end end()).
+
 %% Tables
 -define(RULE_TAB, emqx_rule).
 -define(ACTION_TAB, emqx_rule_action).

+ 19 - 29
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -215,7 +215,7 @@ delete_rule(RuleId) ->
     case emqx_rule_registry:get_rule(RuleId) of
         {ok, Rule = #rule{actions = Actions}} ->
             try
-                cluster_call(clear_rule, [Rule]),
+                _ = ?CLUSTER_CALL(clear_rule, [Rule]),
                 ok = emqx_rule_registry:remove_rule(Rule)
             catch
                 Error:Reason:ST ->
@@ -241,7 +241,7 @@ create_resource(#{type := Type, config := Config0} = Params) ->
             ok = emqx_rule_registry:add_resource(Resource),
             %% Note that we will return OK in case of resource creation failure,
             %% A timer is started to re-start the resource later.
-            catch cluster_call(init_resource, [M, F, ResId, Config]),
+            catch _ = ?CLUSTER_CALL(init_resource, [M, F, ResId, Config]),
             {ok, Resource};
         not_found ->
             {error, {resource_type_not_found, Type}}
@@ -289,15 +289,14 @@ do_update_resource(#{id := Id, type := Type, description := NewDescription, conf
             Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec),
             case test_resource(#{type => Type, config => NewConfig}) of
                 ok ->
-                    Resource = #resource{
+                    _ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]),
+                    emqx_rule_registry:add_resource(#resource{
                         id = Id,
                         type = Type,
                         config = Config,
                         description = NewDescription,
                         created_at = erlang:system_time(millisecond)
-                    },
-                    cluster_call(init_resource, [Module, Create, Id, Config]),
-                    emqx_rule_registry:add_resource(Resource);
+                    });
                {error, Reason} ->
                     error({error, Reason})
             end
@@ -328,8 +327,9 @@ test_resource(#{type := Type, config := Config0}) ->
             Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
             ResId = resource_id(),
             try
-                cluster_call(init_resource, [ModC, Create, ResId, Config]),
-                cluster_call(clear_resource, [ModD, Destroy, ResId])
+                _ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]),
+                _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
+                ok
             catch
                 throw:Reason -> {error, Reason}
             end;
@@ -366,7 +366,8 @@ delete_resource(ResId) ->
                 = emqx_rule_registry:find_resource_type(ResType),
             try
                 ok = emqx_rule_registry:remove_resource(ResId),
-                cluster_call(clear_resource, [ModD, Destroy, ResId])
+                _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]),
+                ok
             catch
                 throw:Reason -> {error, Reason}
             end;
@@ -430,7 +431,13 @@ prepare_action(#{name := Name, args := Args0} = Action, NeedInit) ->
         {ok, #action{module = Mod, on_create = Create, params_spec = ParamSpec}} ->
             Args = emqx_rule_validator:validate_params(Args0, ParamSpec),
             ActionInstId = maps:get(id, Action, action_instance_id(Name)),
-            NeedInit andalso cluster_call(init_action, [Mod, Create, ActionInstId, with_resource_params(Args)]),
+            case NeedInit of
+                true ->
+                    _ = ?CLUSTER_CALL(init_action, [Mod, Create, ActionInstId,
+                            with_resource_params(Args)]),
+                    ok;
+                false -> ok
+            end,
             #action_instance{
                 id = ActionInstId, name = Name, args = Args,
                 fallbacks = prepare_actions(maps:get(fallbacks, Action, []), NeedInit)
@@ -481,7 +488,7 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) ->
 may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) ->
     %% prepare new actions before removing old ones
     NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)),
-    cluster_call(clear_actions, [OldActions]),
+    _ = ?CLUSTER_CALL(clear_actions, [OldActions]),
     may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params));
 may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params
     Rule.
@@ -512,20 +519,6 @@ gen_id(Prefix, TestFun) ->
 action_instance_id(ActionName) ->
     iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]).
 
-cluster_call(Func, Args) ->
-    case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 5000) of
-        {ResL, []} ->
-            case lists:filter(fun(ok) -> false; (_) -> true end, ResL) of
-                [] -> ok;
-                ErrL ->
-                    ?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
-                    throw({func_fail(Func), ErrL})
-            end;
-        {ResL, BadNodes} ->
-            ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
-            throw({func_fail(Func), {failed_on_nodes, BadNodes}})
-   end.
-
 init_resource(Module, OnCreate, ResId, Config) ->
     Params = ?RAISE(Module:OnCreate(ResId, Config),
         {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
@@ -642,15 +635,12 @@ refresh_actions(Actions, Pred) ->
                 true ->
                     {ok, #action{module = Mod, on_create = Create}}
                         = emqx_rule_registry:find_action(ActName),
-                    cluster_call(init_action, [Mod, Create, Id, with_resource_params(Args)]),
+                    _ = ?CLUSTER_CALL(init_action, [Mod, Create, Id, with_resource_params(Args)]),
                     refresh_actions(Fallbacks, Pred);
                 false -> ok
             end
         end, Actions).
 
-func_fail(Func) when is_atom(Func) ->
-    list_to_atom(atom_to_list(Func) ++ "_failure").
-
 find_type(ResId) ->
     {ok, #resource{type = Type}} = emqx_rule_registry:find_resource(ResId),
     {ok, Type}.

+ 2 - 3
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -273,14 +273,13 @@ do_create_resource(Create, ParsedParams) ->
             return({ok, record_to_map(Resource)});
         {error, {resource_type_not_found, Type}} ->
             return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)});
-        {error, {init_resource_failure, _}} ->
+        {error, {init_resource, _}} ->
             return({error, 500, <<"Init resource failure!">>});
         {error, Reason} ->
             ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
             return({error, 400, ?ERR_BADARGS(Reason)})
     end.
 
-
 list_resources(#{}, _Params) ->
     Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
     Data = lists:map(fun(Res = #{id := Id}) ->
@@ -345,7 +344,7 @@ update_resource(#{id := Id}, NewParams) ->
         {error, not_found} ->
             ?LOG(error, "Resource not found: ~0p", [Id]),
             return({error, 400, <<"Resource not found:", Id/binary>>});
-        {error, {init_resource_failure, _}} ->
+        {error, {init_resource, _}} ->
             ?LOG(error, "Init resource failure: ~0p", [Id]),
             return({error, 500, <<"Init resource failure:", Id/binary>>});
         {error, {dependency_exists, RuleId}} ->

+ 15 - 5
apps/emqx_rule_engine/src/emqx_rule_registry.erl

@@ -67,6 +67,10 @@
         , unregister_resource_types_of/1
         ]).
 
+-export([ load_hooks_for_rule/1
+        , unload_hooks_for_rule/1
+        ]).
+
 %% for debug purposes
 -export([dump/0]).
 
@@ -216,8 +220,8 @@ remove_rules(Rules) ->
     gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL).
 
 %% @private
-insert_rule(Rule = #rule{for = Topics}) ->
-    lists:foreach(fun emqx_rule_events:load/1, Topics),
+insert_rule(Rule) ->
+    _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]),
     mnesia:write(?RULE_TAB, Rule, write).
 
 %% @private
@@ -226,15 +230,21 @@ delete_rule(RuleId) when is_binary(RuleId) ->
         {ok, Rule} -> delete_rule(Rule);
         not_found -> ok
     end;
-delete_rule(Rule = #rule{id = Id, for = Topics}) ->
+delete_rule(Rule) ->
+    _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]),
+    mnesia:delete_object(?RULE_TAB, Rule, write).
+
+load_hooks_for_rule(#rule{for = Topics}) ->
+    lists:foreach(fun emqx_rule_events:load/1, Topics).
+
+unload_hooks_for_rule(#rule{id = Id, for = Topics}) ->
     lists:foreach(fun(Topic) ->
             case get_rules_with_same_event(Topic) of
                 [#rule{id = Id}] -> %% we are now deleting the last rule
                     emqx_rule_events:unload(Topic);
                 _ -> ok
             end
-        end, Topics),
-    mnesia:delete_object(?RULE_TAB, Rule, write).
+        end, Topics).
 
 %%------------------------------------------------------------------------------
 %% Action Management