فهرست منبع

Merge pull request #10336 from sstrigler/EMQX-8507-rule-engine-need-a-new-api-to-crud-rule-engines-setting-configs

feat: add `/rule_engine` API endpoint
Stefan Strigler 2 سال پیش
والد
کامیت
f668ad7b9e

+ 4 - 1
apps/emqx_rule_engine/src/emqx_rule_api_schema.erl

@@ -26,7 +26,7 @@
 
 -export([roots/0, fields/1]).
 
--type tag() :: rule_creation | rule_test.
+-type tag() :: rule_creation | rule_test | rule_engine.
 
 -spec check_params(map(), tag()) -> {ok, map()} | {error, term()}.
 check_params(Params, Tag) ->
@@ -48,12 +48,15 @@ check_params(Params, Tag) ->
 
 roots() ->
     [
+        {"rule_engine", sc(ref("rule_engine"), #{desc => ?DESC("root_rule_engine")})},
         {"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})},
         {"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})},
         {"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})},
         {"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})}
     ].
 
+fields("rule_engine") ->
+    emqx_rule_engine_schema:rule_engine_settings();
 fields("rule_creation") ->
     emqx_rule_engine_schema:fields("rules");
 fields("rule_info") ->

+ 58 - 11
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -32,6 +32,7 @@
 
 %% API callbacks
 -export([
+    '/rule_engine'/2,
     '/rule_events'/2,
     '/rule_test'/2,
     '/rules'/2,
@@ -41,7 +42,7 @@
 ]).
 
 %% query callback
--export([qs2ms/2, run_fuzzy_match/2, format_rule_resp/1]).
+-export([qs2ms/2, run_fuzzy_match/2, format_rule_info_resp/1]).
 
 -define(ERR_BADARGS(REASON), begin
     R0 = err_msg(REASON),
@@ -134,6 +135,7 @@ api_spec() ->
 
 paths() ->
     [
+        "/rule_engine",
         "/rule_events",
         "/rule_test",
         "/rules",
@@ -145,6 +147,9 @@ paths() ->
 error_schema(Code, Message) when is_atom(Code) ->
     emqx_dashboard_swagger:error_codes([Code], list_to_binary(Message)).
 
+rule_engine_schema() ->
+    ref(emqx_rule_api_schema, "rule_engine").
+
 rule_creation_schema() ->
     ref(emqx_rule_api_schema, "rule_creation").
 
@@ -184,7 +189,7 @@ schema("/rules") ->
             responses => #{
                 200 =>
                     [
-                        {data, mk(array(rule_info_schema()), #{desc => ?DESC("desc9")})},
+                        {data, mk(array(rule_info_schema()), #{desc => ?DESC("api1_resp")})},
                         {meta, mk(ref(emqx_dashboard_swagger, meta), #{})}
                     ],
                 400 => error_schema('BAD_REQUEST', "Invalid Parameters")
@@ -289,6 +294,26 @@ schema("/rule_test") ->
                 200 => <<"Rule Test Pass">>
             }
         }
+    };
+schema("/rule_engine") ->
+    #{
+        'operationId' => '/rule_engine',
+        get => #{
+            tags => [<<"rules">>],
+            description => ?DESC("api9"),
+            responses => #{
+                200 => rule_engine_schema()
+            }
+        },
+        put => #{
+            tags => [<<"rules">>],
+            description => ?DESC("api10"),
+            'requestBody' => rule_engine_schema(),
+            responses => #{
+                200 => rule_engine_schema(),
+                400 => error_schema('BAD_REQUEST', "Invalid request")
+            }
+        }
     }.
 
 param_path_id() ->
@@ -309,7 +334,7 @@ param_path_id() ->
             QueryString,
             ?RULE_QS_SCHEMA,
             fun ?MODULE:qs2ms/2,
-            fun ?MODULE:format_rule_resp/1
+            fun ?MODULE:format_rule_info_resp/1
         )
     of
         {error, page_limit_invalid} ->
@@ -331,7 +356,7 @@ param_path_id() ->
                     case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
                         {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
                             [Rule] = get_one_rule(AllRules, Id),
-                            {201, format_rule_resp(Rule)};
+                            {201, format_rule_info_resp(Rule)};
                         {error, Reason} ->
                             ?SLOG(error, #{
                                 msg => "create_rule_failed",
@@ -362,7 +387,7 @@ param_path_id() ->
 '/rules/:id'(get, #{bindings := #{id := Id}}) ->
     case emqx_rule_engine:get_rule(Id) of
         {ok, Rule} ->
-            {200, format_rule_resp(Rule)};
+            {200, format_rule_info_resp(Rule)};
         not_found ->
             {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
     end;
@@ -372,7 +397,7 @@ param_path_id() ->
     case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
         {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
             [Rule] = get_one_rule(AllRules, Id),
-            {200, format_rule_resp(Rule)};
+            {200, format_rule_info_resp(Rule)};
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "update_rule_failed",
@@ -419,6 +444,16 @@ param_path_id() ->
             {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
     end.
 
+'/rule_engine'(get, _Params) ->
+    {200, format_rule_engine_resp(emqx_conf:get([rule_engine]))};
+'/rule_engine'(put, #{body := Params}) ->
+    case rule_engine_update(Params) of
+        {ok, Config} ->
+            {200, format_rule_engine_resp(Config)};
+        {error, Reason} ->
+            {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
+    end.
+
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
@@ -440,11 +475,9 @@ encode_nested_error(RuleError, Reason) ->
             {RuleError, Reason}
     end.
 
-format_rule_resp(Rules) when is_list(Rules) ->
-    [format_rule_resp(R) || R <- Rules];
-format_rule_resp({Id, Rule}) ->
-    format_rule_resp(Rule#{id => Id});
-format_rule_resp(#{
+format_rule_info_resp({Id, Rule}) ->
+    format_rule_info_resp(Rule#{id => Id});
+format_rule_info_resp(#{
     id := Id,
     name := Name,
     created_at := CreatedAt,
@@ -465,6 +498,9 @@ format_rule_resp(#{
         description => Descr
     }.
 
+format_rule_engine_resp(Config) ->
+    maps:remove(rules, Config).
+
 format_datetime(Timestamp, Unit) ->
     list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).
 
@@ -661,3 +697,14 @@ run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, like, Pattern} | Fuzzy]) -
         run_fuzzy_match(E, Fuzzy);
 run_fuzzy_match(E, [_ | Fuzzy]) ->
     run_fuzzy_match(E, Fuzzy).
+
+rule_engine_update(Params) ->
+    case emqx_rule_api_schema:check_params(Params, rule_engine) of
+        {ok, _CheckedParams} ->
+            {ok, #{config := Config}} = emqx_conf:update([rule_engine], Params, #{
+                override_to => cluster
+            }),
+            {ok, Config};
+        {error, Reason} ->
+            {error, Reason}
+    end.

+ 34 - 26
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -27,7 +27,8 @@
     roots/0,
     fields/1,
     desc/1,
-    post_config_update/5
+    post_config_update/5,
+    rule_engine_settings/0
 ]).
 
 -export([validate_sql/1]).
@@ -40,31 +41,13 @@ tags() ->
 roots() -> ["rule_engine"].
 
 fields("rule_engine") ->
-    [
-        {ignore_sys_message,
-            ?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
-        {rules,
-            ?HOCON(hoconsc:map("id", ?R_REF("rules")), #{
-                desc => ?DESC("rule_engine_rules"), default => #{}
-            })},
-        {jq_function_default_timeout,
-            ?HOCON(
-                emqx_schema:duration_ms(),
-                #{
-                    default => <<"10s">>,
-                    desc => ?DESC("rule_engine_jq_function_default_timeout")
-                }
-            )},
-        {jq_implementation_module,
-            ?HOCON(
-                hoconsc:enum([jq_nif, jq_port]),
-                #{
-                    default => jq_nif,
-                    mapping => "jq.jq_implementation_module",
-                    desc => ?DESC("rule_engine_jq_implementation_module")
-                }
-            )}
-    ];
+    rule_engine_settings() ++
+        [
+            {rules,
+                ?HOCON(hoconsc:map("id", ?R_REF("rules")), #{
+                    desc => ?DESC("rule_engine_rules"), default => #{}
+                })}
+        ];
 fields("rules") ->
     [
         rule_name(),
@@ -227,6 +210,31 @@ actions() ->
 qos() ->
     ?UNION([emqx_schema:qos(), binary()]).
 
+rule_engine_settings() ->
+    [
+        {ignore_sys_message,
+            ?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
+        {jq_function_default_timeout,
+            ?HOCON(
+                emqx_schema:duration_ms(),
+                #{
+                    default => <<"10s">>,
+                    desc => ?DESC("rule_engine_jq_function_default_timeout")
+                }
+            )},
+        {jq_implementation_module,
+            ?HOCON(
+                hoconsc:enum([jq_nif, jq_port]),
+                #{
+                    default => jq_nif,
+                    mapping => "jq.jq_implementation_module",
+                    desc => ?DESC("rule_engine_jq_implementation_module"),
+                    deprecated => {since, "v5.0.22"},
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )}
+    ].
+
 validate_sql(Sql) ->
     case emqx_rule_sqlparser:parse(Sql) of
         {ok, _Result} -> ok;

+ 79 - 55
apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl

@@ -23,6 +23,14 @@
 -include_lib("common_test/include/ct.hrl").
 
 -define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).
+-define(SIMPLE_RULE(NAME_SUFFIX), #{
+    <<"description">> => <<"A simple rule">>,
+    <<"enable">> => true,
+    <<"actions">> => [#{<<"function">> => <<"console">>}],
+    <<"sql">> => <<"SELECT * from \"t/1\"">>,
+    <<"name">> => <<"test_rule", NAME_SUFFIX/binary>>
+}).
+-define(SIMPLE_RULE(ID, NAME_SUFFIX), ?SIMPLE_RULE(NAME_SUFFIX)#{<<"id">> => ID}).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -37,6 +45,9 @@ end_per_suite(_Config) ->
     emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
     ok.
 
+init_per_testcase(t_crud_rule_api, Config) ->
+    meck:new(emqx_json, [passthrough]),
+    init_per_testcase(common, Config);
 init_per_testcase(_, Config) ->
     Config.
 
@@ -48,7 +59,7 @@ end_per_testcase(_, _Config) ->
         emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
     lists:foreach(
         fun(#{id := Id}) ->
-            emqx_rule_engine_api:'/rules/:id'(
+            {204} = emqx_rule_engine_api:'/rules/:id'(
                 delete,
                 #{bindings => #{id => Id}}
             )
@@ -57,45 +68,38 @@ end_per_testcase(_, _Config) ->
     ).
 
 t_crud_rule_api(_Config) ->
-    RuleID = <<"my_rule">>,
-    Params0 = #{
-        <<"description">> => <<"A simple rule">>,
-        <<"enable">> => true,
-        <<"id">> => RuleID,
-        <<"actions">> => [#{<<"function">> => <<"console">>}],
-        <<"sql">> => <<"SELECT * from \"t/1\"">>,
-        <<"name">> => <<"test_rule">>
-    },
-    {201, Rule} = emqx_rule_engine_api:'/rules'(post, #{body => Params0}),
-    %% if we post again with the same params, it return with 400 "rule id already exists"
-    ?assertMatch(
-        {400, #{code := _, message := _Message}},
-        emqx_rule_engine_api:'/rules'(post, #{body => Params0})
-    ),
+    RuleId = <<"my_rule">>,
+    Rule = simple_rule_fixture(RuleId, <<>>),
+    ?assertEqual(RuleId, maps:get(id, Rule)),
 
-    ?assertEqual(RuleID, maps:get(id, Rule)),
     {200, #{data := Rules}} = emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
     ct:pal("RList : ~p", [Rules]),
     ?assert(length(Rules) > 0),
 
+    %% if we post again with the same id, it return with 400 "rule id already exists"
+    ?assertMatch(
+        {400, #{code := _, message := _Message}},
+        emqx_rule_engine_api:'/rules'(post, #{body => ?SIMPLE_RULE(RuleId, <<"some_other">>)})
+    ),
+
     {204} = emqx_rule_engine_api:'/rules/:id/metrics/reset'(put, #{
-        bindings => #{id => RuleID}
+        bindings => #{id => RuleId}
     }),
 
-    {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}),
+    {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}),
     ct:pal("RShow : ~p", [Rule1]),
     ?assertEqual(Rule, Rule1),
 
-    {200, Metrics} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{bindings => #{id => RuleID}}),
+    {200, Metrics} = emqx_rule_engine_api:'/rules/:id/metrics'(get, #{bindings => #{id => RuleId}}),
     ct:pal("RMetrics : ~p", [Metrics]),
-    ?assertMatch(#{id := RuleID, metrics := _, node_metrics := _}, Metrics),
+    ?assertMatch(#{id := RuleId, metrics := _, node_metrics := _}, Metrics),
 
     {200, Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
-        bindings => #{id => RuleID},
-        body => Params0#{<<"sql">> => <<"select * from \"t/b\"">>}
+        bindings => #{id => RuleId},
+        body => ?SIMPLE_RULE(RuleId)#{<<"sql">> => <<"select * from \"t/b\"">>}
     }),
 
-    {200, Rule3} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}),
+    {200, Rule3} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}}),
     %ct:pal("RShow : ~p", [Rule3]),
     ?assertEqual(Rule3, Rule2),
     ?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)),
@@ -112,14 +116,14 @@ t_crud_rule_api(_Config) ->
         {204},
         emqx_rule_engine_api:'/rules/:id'(
             delete,
-            #{bindings => #{id => RuleID}}
+            #{bindings => #{id => RuleId}}
         )
     ),
 
     %ct:pal("Show After Deleted: ~p", [NotFound]),
     ?assertMatch(
         {404, #{code := _, message := _Message}},
-        emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}})
+        emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleId}})
     ),
 
     {400, #{
@@ -174,30 +178,15 @@ t_crud_rule_api(_Config) ->
     ok.
 
 t_list_rule_api(_Config) ->
-    AddIds =
-        lists:map(
-            fun(Seq0) ->
-                Seq = integer_to_binary(Seq0),
-                Params = #{
-                    <<"description">> => <<"A simple rule">>,
-                    <<"enable">> => true,
-                    <<"actions">> => [#{<<"function">> => <<"console">>}],
-                    <<"sql">> => <<"SELECT * from \"t/1\"">>,
-                    <<"name">> => <<"test_rule", Seq/binary>>
-                },
-                {201, #{id := Id}} = emqx_rule_engine_api:'/rules'(post, #{body => Params}),
-                Id
-            end,
-            lists:seq(1, 20)
-        ),
-
+    AddIds = rules_fixture(20),
+    ct:pal("rule ids: ~p", [AddIds]),
     {200, #{data := Rules, meta := #{count := Count}}} =
         emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
     ?assertEqual(20, length(AddIds)),
     ?assertEqual(20, length(Rules)),
     ?assertEqual(20, Count),
 
-    [RuleID | _] = AddIds,
+    [RuleId | _] = AddIds,
     UpdateParams = #{
         <<"description">> => <<"中文的描述也能搜索"/utf8>>,
         <<"enable">> => false,
@@ -206,7 +195,7 @@ t_list_rule_api(_Config) ->
         <<"name">> => <<"test_rule_update1">>
     },
     {200, _Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
-        bindings => #{id => RuleID},
+        bindings => #{id => RuleId},
         body => UpdateParams
     }),
     QueryStr1 = #{query_string => #{<<"enable">> => false}},
@@ -229,20 +218,13 @@ t_list_rule_api(_Config) ->
     {200, Result5} = emqx_rule_engine_api:'/rules'(get, QueryStr5),
     ?assertEqual(maps:get(data, Result1), maps:get(data, Result5)),
 
-    QueryStr6 = #{query_string => #{<<"like_id">> => RuleID}},
+    QueryStr6 = #{query_string => #{<<"like_id">> => RuleId}},
     {200, Result6} = emqx_rule_engine_api:'/rules'(get, QueryStr6),
     ?assertEqual(maps:get(data, Result1), maps:get(data, Result6)),
     ok.
 
 t_reset_metrics_on_disable(_Config) ->
-    Params = #{
-        <<"description">> => <<"A simple rule">>,
-        <<"enable">> => true,
-        <<"actions">> => [#{<<"function">> => <<"console">>}],
-        <<"sql">> => <<"SELECT * from \"t/1\"">>,
-        <<"name">> => atom_to_binary(?FUNCTION_NAME)
-    },
-    {201, #{id := RuleId}} = emqx_rule_engine_api:'/rules'(post, #{body => Params}),
+    #{id := RuleId} = simple_rule_fixture(),
 
     %% generate some fake metrics
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'matched', 10),
@@ -256,7 +238,7 @@ t_reset_metrics_on_disable(_Config) ->
     %% disable the rule; metrics should be reset
     {200, _Rule2} = emqx_rule_engine_api:'/rules/:id'(put, #{
         bindings => #{id => RuleId},
-        body => Params#{<<"enable">> := false}
+        body => #{<<"enable">> => false}
     }),
 
     {200, #{metrics := Metrics1}} = emqx_rule_engine_api:'/rules/:id/metrics'(
@@ -281,3 +263,45 @@ test_rule_params(Sql, Payload) ->
             <<"sql">> => Sql
         }
     }.
+
+t_rule_engine(_) ->
+    _ = simple_rule_fixture(),
+    {200, Config} = emqx_rule_engine_api:'/rule_engine'(get, #{}),
+    ?assert(not maps:is_key(rules, Config)),
+    {200, #{
+        jq_function_default_timeout := 12000
+        % hidden! jq_implementation_module := jq_port
+    }} = emqx_rule_engine_api:'/rule_engine'(put, #{
+        body => #{
+            <<"jq_function_default_timeout">> => <<"12s">>,
+            <<"jq_implementation_module">> => <<"jq_port">>
+        }
+    }),
+    SomeRule = #{<<"sql">> => <<"SELECT * FROM \"t/#\"">>},
+    {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{
+        body => #{<<"rules">> => #{<<"some_rule">> => SomeRule}}
+    }),
+    {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}).
+
+rules_fixture(N) ->
+    lists:map(
+        fun(Seq0) ->
+            Seq = integer_to_binary(Seq0),
+            #{id := Id} = simple_rule_fixture(Seq),
+            Id
+        end,
+        lists:seq(1, N)
+    ).
+
+simple_rule_fixture() ->
+    simple_rule_fixture(<<>>).
+
+simple_rule_fixture(NameSuffix) ->
+    create_rule(?SIMPLE_RULE(NameSuffix)).
+
+simple_rule_fixture(Id, NameSuffix) ->
+    create_rule(?SIMPLE_RULE(Id, NameSuffix)).
+
+create_rule(Params) ->
+    {201, Rule} = emqx_rule_engine_api:'/rules'(post, #{body => Params}),
+    Rule.

+ 1 - 0
changes/ce/feat-10336.en.md

@@ -0,0 +1 @@
+Add `/rule_engine` API endpoint to manage configuration of rule engine.

+ 11 - 0
rel/i18n/emqx_rule_api_schema.hocon

@@ -638,6 +638,17 @@ emqx_rule_api_schema {
                           }
                   }
 
+    root_rule_engine {
+                   desc {
+                         en: "Rule engine configurations. This API can be used to change EMQX rule engine settings. But not for the rules. To list, create, or update rules, call the '/rules' API instead."
+                         zh: "规则引擎配置。该 API 可用于查看和修改规则引擎相关的一些设置。但不可用于规则,如需查看或修改规则,请调用 '/rules' API 进行操作。"
+                        }
+                   label: {
+                           en: "Rule engine configuration"
+                           zh: "规则引擎配置"
+                          }
+                  }
+
     root_rule_creation {
                    desc {
                          en: "Schema for creating rules"

+ 28 - 11
rel/i18n/emqx_rule_engine_api.hocon

@@ -50,7 +50,16 @@ emqx_rule_engine_api {
                          zh: "根据规则来源 Topic 过滤, 使用 MQTT Topic 匹配"
                         }
                   }
-
+    api1_resp {
+        desc {
+            en: "List of rules"
+            zh: "规则列表"
+        }
+        label: {
+            en: "List Rules"
+            zh: "列出所有规则"
+        }
+    }
     api2 {
                    desc {
                          en: "Create a new rule using given Id"
@@ -113,10 +122,9 @@ emqx_rule_engine_api {
                         }
                    label: {
                            en: "Delete Cluster Rule"
-                           zh: "删除集群规则"
+                           zh: "基于给定 ID 新建一条规则"
                           }
                   }
-
     api7 {
                    desc {
                          en: "Reset a rule metrics"
@@ -127,7 +135,6 @@ emqx_rule_engine_api {
                            zh: "重置规则计数"
                           }
                   }
-
     api8 {
                    desc {
                          en: "Test a rule"
@@ -138,14 +145,24 @@ emqx_rule_engine_api {
                            zh: "测试规则"
                           }
                   }
-    desc9 {
+    api9 {
                    desc {
-                         en: "List of rules"
-                         zh: "列出所有规则"
+                         en: "Get rule engine configuration."
+                         zh: "获取规则引擎配置。"
                         }
-                   label: {
-                           en: "List Rules"
-                           zh: "列出所有规则"
+                   label {
+                          en: "Get configuration"
+                          zh: "获取配置"
+                         }
+    }
+    api10 {
+                   desc {
+                         en: "Update rule engine configuration."
+                         zh: "更新规则引擎配置。"
+                        }
+                   label {
+                          en: "Update configuration"
+                          zh: "更新配置"
                           }
-                  }
+    }
 }