Просмотр исходного кода

feat(rules): update rule_engine configs from APIs

Shawn 4 лет назад
Родитель
Сommit
9c7eef5295

+ 8 - 14
apps/emqx/src/emqx_config_handler.erl

@@ -41,13 +41,6 @@
 -define(MOD, {mod}).
 -define(WKEY, '?').
 
--define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL),
-    try [safe_atom(Key) || Key <- PATH] of
-        AtomKeyPath -> EXP
-    catch
-        error:badarg -> EXP_ON_FAIL
-    end).
-
 -type handler_name() :: module().
 -type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}.
 
@@ -76,8 +69,9 @@ stop() ->
 -spec update_config(module(), emqx_config:config_key_path(), emqx_config:update_args()) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
 update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
-    ?ATOM_CONF_PATH(ConfKeyPath, gen_server:call(?MODULE, {change_config, SchemaModule,
-        AtomKeyPath, UpdateArgs}), {error, {not_found, ConfKeyPath}}).
+    %% force covert the path to a list of atoms, as there maybe some wildcard names/ids in the path
+    AtomKeyPath = [atom(Key) || Key <- ConfKeyPath],
+    gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}).
 
 -spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok.
 add_handler(ConfKeyPath, HandlerName) ->
@@ -310,9 +304,9 @@ bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath].
 bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(B) when is_binary(B) -> B.
 
-safe_atom(Bin) when is_binary(Bin) ->
-    binary_to_existing_atom(Bin, latin1);
-safe_atom(Str) when is_list(Str) ->
-    list_to_existing_atom(Str);
-safe_atom(Atom) when is_atom(Atom) ->
+atom(Bin) when is_binary(Bin) ->
+    binary_to_atom(Bin, utf8);
+atom(Str) when is_list(Str) ->
+    list_to_atom(Str);
+atom(Atom) when is_atom(Atom) ->
     Atom.

+ 1 - 0
apps/emqx_bridge/src/emqx_bridge_app.erl

@@ -27,6 +27,7 @@ start(_StartType, _StartArgs) ->
     {ok, Sup}.
 
 stop(_State) ->
+    emqx_config_handler:remove_handler(emqx_bridge:config_key_path()),
     ok = emqx_bridge:unload_hook(),
     ok.
 

+ 34 - 7
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -17,6 +17,7 @@
 -module(emqx_rule_engine).
 
 -behaviour(gen_server).
+-behaviour(emqx_config_handler).
 
 -include("rule_engine.hrl").
 -include_lib("emqx/include/logger.hrl").
@@ -24,6 +25,10 @@
 
 -export([start_link/0]).
 
+-export([ post_config_update/4
+        , config_key_path/0
+        ]).
+
 %% Rule Management
 
 -export([ load_rules/0
@@ -66,14 +71,33 @@
 
 -define(T_CALL, 10000).
 
-%%------------------------------------------------------------------------------
-%% Start the gen_server
-%%------------------------------------------------------------------------------
+-define(FOREACH_RULE(RULES, EXPR),
+    lists:foreach(fun({ID0, _ITEM}) ->
+            ID = bin(ID0),
+            EXPR
+        end, maps:to_list(RULES))).
+
+config_key_path() ->
+    [rule_engine, rules].
 
 -spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}).
 start_link() ->
     gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []).
 
+%%------------------------------------------------------------------------------
+%% The config handler for emqx_rule_engine
+%%------------------------------------------------------------------------------
+post_config_update(_Req, NewRules, OldRules, _AppEnvs) ->
+    #{added := Added, removed := Removed, changed := Updated}
+        = emqx_map_lib:diff_maps(NewRules, OldRules),
+    ?FOREACH_RULE(Updated, begin
+        {_Old, New} = _ITEM,
+        {ok, _} = update_rule(New#{id => ID})
+    end),
+    ?FOREACH_RULE(Removed, ok = delete_rule(ID)),
+    ?FOREACH_RULE(Added, {ok, _} = create_rule(_ITEM#{id => ID})),
+    {ok, get_rules()}.
+
 %%------------------------------------------------------------------------------
 %% APIs for rules
 %%------------------------------------------------------------------------------
@@ -81,23 +105,23 @@ start_link() ->
 -spec load_rules() -> ok.
 load_rules() ->
     lists:foreach(fun({Id, Rule}) ->
-            {ok, _} = create_rule(Rule#{id => Id})
+            {ok, _} = create_rule(Rule#{id => bin(Id)})
         end, maps:to_list(emqx:get_config([rule_engine, rules], #{}))).
 
 -spec create_rule(map()) -> {ok, rule()} | {error, term()}.
-create_rule(Params = #{id := RuleId}) ->
+create_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
     case get_rule(RuleId) of
         not_found -> do_create_rule(Params);
         {ok, _} -> {error, {already_exists, RuleId}}
     end.
 
 -spec update_rule(map()) -> {ok, rule()} | {error, term()}.
-update_rule(Params = #{id := RuleId}) ->
+update_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
     ok = delete_rule(RuleId),
     do_create_rule(Params).
 
 -spec(delete_rule(RuleId :: rule_id()) -> ok).
-delete_rule(RuleId) ->
+delete_rule(RuleId) when is_binary(RuleId) ->
     gen_server:call(?RULE_ENGINE, {delete_rule, RuleId}, ?T_CALL).
 
 -spec(insert_rule(Rule :: rule()) -> ok).
@@ -259,3 +283,6 @@ parse_output_func(Func) when is_function(Func) ->
 
 get_all_records(Tab) ->
     [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].
+
+bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
+bin(B) when is_binary(B) -> B.

+ 32 - 16
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -245,13 +245,22 @@ crud_rules(get, _Params) ->
     Records = emqx_rule_engine:get_rules_ordered_by_ts(),
     {200, format_rule_resp(Records)};
 
-crud_rules(post, #{body := Params}) ->
-    ?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:create_rule(CheckedParams) of
-        {ok, Rule} -> {201, format_rule_resp(Rule)};
-        {error, Reason} ->
-            ?SLOG(error, #{msg => "create_rule_failed", reason => Reason}),
-            {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
-    end).
+crud_rules(post, #{body := #{<<"id">> := Id} = Params}) ->
+    ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
+    case emqx_rule_engine:get_rule(Id) of
+        {ok, _Rule} ->
+            {400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}};
+        not_found ->
+            case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of
+                {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
+                    [Rule] = [R || R = #{id := Id0} <- AllRules, Id0 == Id],
+                    {201, format_rule_resp(Rule)};
+                {error, Reason} ->
+                    ?SLOG(error, #{msg => "create_rule_failed",
+                                   id => Id, reason => Reason}),
+                    {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
+            end
+    end.
 
 rule_test(post, #{body := Params}) ->
     ?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of
@@ -267,20 +276,27 @@ crud_rules_by_id(get, #{bindings := #{id := Id}}) ->
             {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
     end;
 
-crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params0}) ->
-    Params = maps:merge(Params0, #{id => Id}),
-    ?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:update_rule(CheckedParams) of
-        {ok, Rule} -> {200, format_rule_resp(Rule)};
+crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) ->
+    ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
+    case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of
+        {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
+            [Rule] = [R || R = #{id := Id0} <- AllRules, Id0 == Id],
+            {200, format_rule_resp(Rule)};
         {error, Reason} ->
             ?SLOG(error, #{msg => "update_rule_failed",
-                           id => Id,
-                           reason => Reason}),
+                           id => Id, reason => Reason}),
             {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
-    end);
+    end;
 
 crud_rules_by_id(delete, #{bindings := #{id := Id}}) ->
-    ok = emqx_rule_engine:delete_rule(Id),
-    {200}.
+    ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
+    case emqx:remove_config(ConfPath, #{}) of
+        {ok, _} -> {200};
+        {error, Reason} ->
+            ?SLOG(error, #{msg => "delete_rule_failed",
+                           id => Id, reason => Reason}),
+            {500, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
+    end.
 
 %%------------------------------------------------------------------------------
 %% Internal functions

+ 2 - 0
apps/emqx_rule_engine/src/emqx_rule_engine_app.erl

@@ -29,7 +29,9 @@ start(_Type, _Args) ->
     ok = emqx_rule_events:reload(),
     SupRet = emqx_rule_engine_sup:start_link(),
     ok = emqx_rule_engine:load_rules(),
+    emqx_config_handler:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine),
     SupRet.
 
 stop(_State) ->
+    emqx_config_handler:remove_handler(emqx_rule_engine:config_key_path()),
     ok = emqx_rule_events:unload().

+ 6 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl

@@ -6,11 +6,14 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
+-define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).
+
 all() ->
     emqx_ct:all(?MODULE).
 
 init_per_suite(Config) ->
     application:load(emqx_machine),
+    ok = emqx_config:init_load(emqx_rule_engine_schema, ?CONF_DEFAULT),
     ok = emqx_ct_helpers:start_apps([emqx_rule_engine]),
     Config.
 
@@ -35,6 +38,9 @@ t_crud_rule_api(_Config) ->
         <<"sql">> => <<"SELECT * from \"t/1\"">>
     },
     {201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}),
+    %% if we post again with the same params, it return with 400 "rule id already exists"
+    ?assertMatch({400, #{code := _, message := _Message}},
+        emqx_rule_engine_api:crud_rules(post, #{body => Params0})),
 
     ?assertEqual(RuleID, maps:get(id, Rule)),
     {200, Rules} = emqx_rule_engine_api:crud_rules(get, #{}),