Просмотр исходного кода

feat(rules): support configure rules in config file

Shawn 4 лет назад
Родитель
Сommit
b063b6f253

+ 15 - 0
apps/emqx_rule_engine/etc/emqx_rule_engine.conf

@@ -3,4 +3,19 @@
 ##====================================================================
 rule_engine {
     ignore_sys_message = true
+    #rules.my_republish_rule {
+    #    description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'"
+    #    enable = true
+    #    sql = "SELECT * FROM \"t/1\""
+    #    outputs = [
+    #        {
+    #            function = republish
+    #            args = {
+    #                topic = "t/2"
+    #                qos = "${qos}"
+    #                payload = "${payload}"
+    #            }
+    #        }
+    #    ]
+    #}
 }

+ 9 - 16
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -31,35 +31,30 @@
 -type bridge_channel_id() :: binary().
 -type selected_data() :: map().
 -type envs() :: map().
--type output_type() :: bridge | builtin | func.
 -type output_target() :: bridge_channel_id() | atom() | output_fun().
 -type output_fun_args() :: map().
 -type output() :: #{
-        type := output_type(),
-        target := output_target(),
+        function := output_target(),
         args => output_fun_args()
 }.
+
 -type output_fun() :: fun((selected_data(), envs(), output_fun_args()) -> any()).
 
--type rule_info() ::
-       #{ from := list(topic())
-        , outputs := [output()]
+-type rule() ::
+       #{ id := rule_id()
         , sql := binary()
+        , outputs := [output()]
+        , enabled := boolean()
+        , description => binary()
+        , created_at := integer() %% epoch in millisecond precision
+        , from := list(topic())
         , is_foreach := boolean()
         , fields := list()
         , doeach := term()
         , incase := term()
         , conditions := tuple()
-        , enabled := boolean()
-        , description => binary()
         }.
 
--record(rule,
-        { id :: rule_id()
-        , created_at :: integer() %% epoch in millisecond precision
-        , info :: rule_info()
-        }).
-
 %% Arithmetic operators
 -define(is_arith(Op), (Op =:= '+' orelse
                        Op =:= '-' orelse
@@ -94,5 +89,3 @@
 
 %% Tables
 -define(RULE_TAB, emqx_rule).
-
--define(RULE_ENGINE_SHARD, emqx_rule_engine_shard).

+ 1 - 42
apps/emqx_rule_engine/src/emqx_rule_api_schema.erl

@@ -37,16 +37,7 @@ roots() ->
 
 fields("rule_creation") ->
     [ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})}
-    , {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false})}
-    , {"outputs", sc(hoconsc:array(hoconsc:union(
-                                   [ ref("bridge_output")
-                                   , ref("builtin_output")
-                                   ])),
-          #{desc => "The outputs of the rule",
-            default => []})}
-    , {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})}
-    , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})}
-    ];
+    ] ++ emqx_rule_engine_schema:fields("rules");
 
 fields("rule_test") ->
     [ {"context", sc(hoconsc:union([ ref("ctx_pub")
@@ -62,38 +53,6 @@ fields("rule_test") ->
     , {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})}
     ];
 
-fields("bridge_output") ->
-    [ {type, bridge}
-    , {target, sc(binary(), #{desc => "The Channel ID of the bridge"})}
-    ];
-
-fields("builtin_output") ->
-    [ {type, builtin}
-    , {target, sc(binary(), #{desc => "The Name of the built-on output"})}
-    , {args, sc(map(), #{desc => "The arguments of the built-in output",
-        default => #{}})}
-    ];
-
-%% TODO: how to use this in "builtin_output".args ?
-fields("republish_args") ->
-    [ {topic, sc(binary(),
-        #{desc => "The target topic of the re-published message."
-                  " Template with with variables is allowed.",
-          nullable => false})}
-    , {qos, sc(binary(),
-        #{desc => "The qos of the re-published message."
-                  " Template with with variables is allowed. Defaults to ${qos}.",
-          default => <<"${qos}">> })}
-    , {retain, sc(binary(),
-        #{desc => "The retain of the re-published message."
-                  " Template with with variables is allowed. Defaults to ${retain}.",
-          default => <<"${retain}">> })}
-    , {payload, sc(binary(),
-        #{desc => "The payload of the re-published message."
-                  " Template with with variables is allowed. Defaults to ${payload}.",
-          default => <<"${payload}">>})}
-    ];
-
 fields("ctx_pub") ->
     [ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})}
     , {"id", sc(binary(), #{desc => "Message ID"})}

+ 42 - 51
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -19,21 +19,24 @@
 -include("rule_engine.hrl").
 -include_lib("emqx/include/logger.hrl").
 
+-export([ load_rules/0
+        ]).
+
 -export([ create_rule/1
         , update_rule/1
         , delete_rule/1
         ]).
 
--export_type([rule/0]).
-
--type rule() :: #rule{}.
-
--define(T_RETRY, 60000).
-
 %%------------------------------------------------------------------------------
 %% APIs for rules and resources
 %%------------------------------------------------------------------------------
 
+-spec load_rules() -> ok.
+load_rules() ->
+    lists:foreach(fun({Id, Rule}) ->
+            {ok, _} = create_rule(Rule#{id => Id})
+        end, maps:to_list(emqx:get_config([rule_engine, rules], #{}))).
+
 -spec create_rule(map()) -> {ok, rule()} | {error, term()}.
 create_rule(Params = #{id := RuleId}) ->
     case emqx_rule_registry:get_rule(RuleId) of
@@ -52,9 +55,7 @@ update_rule(Params = #{id := RuleId}) ->
 delete_rule(RuleId) ->
     case emqx_rule_registry:get_rule(RuleId) of
         {ok, Rule} ->
-            ok = emqx_rule_registry:remove_rule(Rule),
-            _ = emqx_plugin_libs_rule:cluster_call(emqx_rule_metrics, clear_rule_metrics, [RuleId]),
-            ok;
+            emqx_rule_registry:remove_rule(Rule);
         not_found ->
             {error, not_found}
     end.
@@ -66,26 +67,23 @@ delete_rule(RuleId) ->
 do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
     case emqx_rule_sqlparser:parse(Sql) of
         {ok, Select} ->
-            Rule = #rule{
-                id = RuleId,
-                created_at = erlang:system_time(millisecond),
-                info = #{
-                    enabled => maps:get(enabled, Params, true),
-                    sql => Sql,
-                    from => emqx_rule_sqlparser:select_from(Select),
-                    outputs => parse_outputs(Outputs),
-                    description => maps:get(description, Params, ""),
-                    %% -- calculated fields:
-                    is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
-                    fields => emqx_rule_sqlparser:select_fields(Select),
-                    doeach => emqx_rule_sqlparser:select_doeach(Select),
-                    incase => emqx_rule_sqlparser:select_incase(Select),
-                    conditions => emqx_rule_sqlparser:select_where(Select)
-                    %% -- calculated fields end
-                }
+            Rule = #{
+                id => RuleId,
+                created_at => erlang:system_time(millisecond),
+                enabled => maps:get(enabled, Params, true),
+                sql => Sql,
+                outputs => parse_outputs(Outputs),
+                description => maps:get(description, Params, ""),
+                %% -- calculated fields:
+                from => emqx_rule_sqlparser:select_from(Select),
+                is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
+                fields => emqx_rule_sqlparser:select_fields(Select),
+                doeach => emqx_rule_sqlparser:select_doeach(Select),
+                incase => emqx_rule_sqlparser:select_incase(Select),
+                conditions => emqx_rule_sqlparser:select_where(Select)
+                %% -- calculated fields end
             },
             ok = emqx_rule_registry:add_rule(Rule),
-            _ = emqx_plugin_libs_rule:cluster_call(emqx_rule_metrics, create_rule_metrics, [RuleId]),
             {ok, Rule};
         {error, Reason} -> {error, Reason}
     end.
@@ -93,28 +91,21 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
 parse_outputs(Outputs) ->
     [do_parse_outputs(Out) || Out <- Outputs].
 
-do_parse_outputs(#{type := bridge, target := ChId}) ->
-    #{type => bridge, target => ChId};
-do_parse_outputs(#{type := builtin, target := Repub, args := Args})
+do_parse_outputs(#{function := Repub, args := Args})
         when Repub == republish; Repub == <<"republish">> ->
-    #{type => builtin, target => republish, args => pre_process_repub_args(Args)};
-do_parse_outputs(#{type := Type, target := Name} = Output)
-        when Type == func; Type == builtin ->
-    #{type => Type, target => Name, args => maps:get(args, Output, #{})}.
-
-pre_process_repub_args(#{<<"topic">> := Topic} = Args) ->
-    QoS = maps:get(<<"qos">>, Args, <<"${qos}">>),
-    Retain = maps:get(<<"retain">>, Args, <<"${retain}">>),
-    Payload = maps:get(<<"payload">>, Args, <<"${payload}">>),
-    #{topic => Topic, qos => QoS, payload => Payload, retain => Retain,
-      preprocessed_tmpl => #{
-          topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
-          qos => preproc_vars(QoS),
-          retain => preproc_vars(Retain),
-          payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
-      }}.
-
-preproc_vars(Data) when is_binary(Data) ->
-    emqx_plugin_libs_rule:preproc_tmpl(Data);
-preproc_vars(Data) ->
-    Data.
+    #{function => republish, args => emqx_rule_outputs:pre_process_repub_args(Args)};
+do_parse_outputs(#{function := Func} = Output) ->
+    #{function => parse_output_func(Func), args => maps:get(args, Output, #{})};
+do_parse_outputs(BridgeChannelId) when is_binary(BridgeChannelId) ->
+    BridgeChannelId.
+
+parse_output_func(FuncName) when is_atom(FuncName) ->
+    FuncName;
+parse_output_func(BinFunc) when is_binary(BinFunc) ->
+    try binary_to_existing_atom(BinFunc) of
+        Func -> emqx_rule_outputs:assert_builtin_output(Func)
+    catch
+        error:badarg -> error({unknown_builtin_function, BinFunc})
+    end;
+parse_output_func(Func) when is_function(Func) ->
+    Func.

+ 15 - 13
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -295,13 +295,12 @@ err_msg(Msg) ->
 format_rule_resp(Rules) when is_list(Rules) ->
     [format_rule_resp(R) || R <- Rules];
 
-format_rule_resp(#rule{id = Id, created_at = CreatedAt,
-    info = #{
-        from := Topics,
-        outputs := Output,
-        sql := SQL,
-        enabled := Enabled,
-        description := Descr}}) ->
+format_rule_resp(#{ id := Id, created_at := CreatedAt,
+                    from := Topics,
+                    outputs := Output,
+                    sql := SQL,
+                    enabled := Enabled,
+                    description := Descr}) ->
     #{id => Id,
       from => Topics,
       outputs => format_output(Output),
@@ -318,12 +317,15 @@ format_datetime(Timestamp, Unit) ->
 format_output(Outputs) ->
     [do_format_output(Out) || Out <- Outputs].
 
-do_format_output(#{type := func}) ->
-    #{type => func, target => <<"internal_function">>};
-do_format_output(#{type := builtin, target := Name, args := Args}) ->
-    #{type => builtin, target => Name, args => maps:remove(preprocessed_tmpl, Args)};
-do_format_output(#{type := bridge, target := Name}) ->
-    #{type => bridge, target => Name}.
+do_format_output(#{function := Func}) when is_function(Func) ->
+    FunInfo = erlang:fun_info(Func),
+    FunMod = proplists:get_value(module, FunInfo),
+    FunName = proplists:get_value(name, FunInfo),
+    #{function => list_to_binary(lists:concat([FunMod,":",FunName]))};
+do_format_output(#{function := Name, args := Args}) ->
+    #{function => Name, args => maps:remove(preprocessed_tmpl, Args)};
+do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
+    BridgeChannelId.
 
 get_rule_metrics(Id) ->
     [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))

+ 2 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_app.erl

@@ -25,8 +25,9 @@
 -export([stop/1]).
 
 start(_Type, _Args) ->
-    ok = ekka_rlog:wait_for_shards([?RULE_ENGINE_SHARD], infinity),
+    ets:new(?RULE_TAB, [named_table, public, set, {read_concurrency, true}]),
     ok = emqx_rule_events:reload(),
+    ok = emqx_rule_engine:load_rules(),
     emqx_rule_engine_sup:start_link().
 
 stop(_State) ->

+ 52 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -29,4 +29,55 @@ namespace() -> rule_engine.
 roots() -> ["rule_engine"].
 
 fields("rule_engine") ->
-    [{ignore_sys_message, hoconsc:mk(boolean(), #{default => true})}].
+    [ {ignore_sys_message, sc(boolean(), #{default => true})}
+    , {rules, sc(hoconsc:map("id", ref("rules")), #{desc => "The rules", default => #{}})}
+    ];
+
+fields("rules") ->
+    [ {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false})}
+    , {"outputs", sc(hoconsc:array(hoconsc:union(
+                                   [ binary()
+                                   , ref("builtin_output_republish")
+                                   , ref("builtin_output_console")
+                                   ])),
+          #{desc => "The outputs of the rule. An output can be a string refers to the channel Id "
+                    "of a emqx bridge, or a object refers to a built-in function.",
+            default => []})}
+    , {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})}
+    , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})}
+    ];
+
+fields("builtin_output_republish") ->
+    [ {function, sc(republish, #{desc => "Republish the message as a new MQTT message"})}
+    , {args, sc(ref("republish_args"), #{desc => "The arguments of the built-in 'republish' output",
+        default => #{}})}
+    ];
+
+fields("builtin_output_console") ->
+    [ {function, sc(console, #{desc => "Print the outputs to the console"})}
+    %% we may support some args for the console output in the future
+    %, {args, sc(map(), #{desc => "The arguments of the built-in 'console' output",
+    %    default => #{}})}
+    ];
+
+fields("republish_args") ->
+    [ {topic, sc(binary(),
+        #{desc => "The target topic of the re-published message."
+                  " Template with with variables is allowed.",
+          nullable => false})}
+    , {qos, sc(binary(),
+        #{desc => "The qos of the re-published message."
+                  " Template with with variables is allowed. Defaults to ${qos}.",
+          default => <<"${qos}">> })}
+    , {retain, sc(binary(),
+        #{desc => "The retain of the re-published message."
+                  " Template with with variables is allowed. Defaults to ${retain}.",
+          default => <<"${retain}">> })}
+    , {payload, sc(binary(),
+        #{desc => "The payload of the re-published message."
+                  " Template with with variables is allowed. Defaults to ${payload}.",
+          default => <<"${payload}">>})}
+    ].
+
+sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+ref(Field) -> hoconsc:ref(?MODULE, Field).

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -64,7 +64,7 @@
 -endif.
 
 reload() ->
-    emqx_rule_registry:load_hooks_for_rule(emqx_rule_registry:get_rules()).
+    emqx_rule_registry:load_hooks_for_rules(emqx_rule_registry:get_rules()).
 
 load(<<"$bridges/", _ChannelId/binary>> = BridgeTopic) ->
     emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received,

+ 32 - 0
apps/emqx_rule_engine/src/emqx_rule_outputs.erl

@@ -19,10 +19,25 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx.hrl").
 
+-define(OUTPUT_FUNCS,
+        [ console
+        , republish
+        ]).
+
 -export([ console/3
         , republish/3
         ]).
 
+-export([ pre_process_repub_args/1
+        , assert_builtin_output/1
+        ]).
+
+assert_builtin_output(FuncName) ->
+    case lists:member(FuncName, ?OUTPUT_FUNCS) of
+        true -> FuncName;
+        false -> error({unknown_builtin_function, FuncName})
+    end.
+
 -spec console(map(), map(), map()) -> any().
 console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) ->
     ?ULOG("[rule output] ~ts~n"
@@ -75,6 +90,23 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
     _ = emqx_broker:safe_publish(Msg),
     emqx_metrics:inc_msg(Msg).
 
+pre_process_repub_args(#{<<"topic">> := Topic} = Args) ->
+    QoS = maps:get(<<"qos">>, Args, <<"${qos}">>),
+    Retain = maps:get(<<"retain">>, Args, <<"${retain}">>),
+    Payload = maps:get(<<"payload">>, Args, <<"${payload}">>),
+    #{topic => Topic, qos => QoS, payload => Payload, retain => Retain,
+      preprocessed_tmpl => #{
+          topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
+          qos => preproc_vars(QoS),
+          retain => preproc_vars(Retain),
+          payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
+      }}.
+
+preproc_vars(Data) when is_binary(Data) ->
+    emqx_plugin_libs_rule:preproc_tmpl(Data);
+preproc_vars(Data) ->
+    Data.
+
 replace_simple_var(Tokens, Data) when is_list(Tokens) ->
     [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
     Var;

+ 60 - 89
apps/emqx_rule_engine/src/emqx_rule_registry.erl

@@ -36,12 +36,15 @@
         , remove_rules/1
         ]).
 
--export([ load_hooks_for_rule/1
-        , unload_hooks_for_rule/1
+-export([ do_remove_rules/1
+        , do_add_rules/1
         ]).
 
-%% for debug purposes
--export([dump/0]).
+-export([ load_hooks_for_rules/1
+        , unload_hooks_for_rule/1
+        , add_metrics_for_rules/1
+        , clear_metrics_for_rules/1
+        ]).
 
 %% gen_server Callbacks
 -export([ init/1
@@ -52,39 +55,10 @@
         , code_change/3
         ]).
 
-%% Mnesia bootstrap
--export([mnesia/1]).
-
--boot_mnesia({mnesia, [boot]}).
--copy_mnesia({mnesia, [copy]}).
-
 -define(REGISTRY, ?MODULE).
 
 -define(T_CALL, 10000).
 
-%%------------------------------------------------------------------------------
-%% Mnesia bootstrap
-%%------------------------------------------------------------------------------
-
-%% @doc Create or replicate tables.
--spec(mnesia(boot | copy) -> ok).
-mnesia(boot) ->
-    %% Optimize storage
-    StoreProps = [{ets, [{read_concurrency, true}]}],
-    %% Rule table
-    ok = ekka_mnesia:create_table(?RULE_TAB, [
-                {rlog_shard, ?RULE_ENGINE_SHARD},
-                {disc_copies, [node()]},
-                {record_name, rule},
-                {attributes, record_info(fields, rule)},
-                {storage_properties, StoreProps}]);
-
-mnesia(copy) ->
-    ok = ekka_mnesia:copy_table(?RULE_TAB, disc_copies).
-
-dump() ->
-    ?ULOG("Rules: ~p~n", [ets:tab2list(?RULE_TAB)]).
-
 %%------------------------------------------------------------------------------
 %% Start the registry
 %%------------------------------------------------------------------------------
@@ -97,90 +71,102 @@ start_link() ->
 %% Rule Management
 %%------------------------------------------------------------------------------
 
--spec(get_rules() -> list(emqx_rule_engine:rule())).
+-spec(get_rules() -> [rule()]).
 get_rules() ->
     get_all_records(?RULE_TAB).
 
 get_rules_ordered_by_ts() ->
-    F = fun() ->
-        Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]),
-        qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}]))
-    end,
-    {atomic, List} = ekka_mnesia:transaction(?RULE_ENGINE_SHARD, F),
-    List.
-
--spec(get_rules_for_topic(Topic :: binary()) -> list(emqx_rule_engine:rule())).
+    lists:sort(fun(#{created_at := CreatedA}, #{created_at := CreatedB}) ->
+            CreatedA =< CreatedB
+        end, get_rules()).
+
+-spec(get_rules_for_topic(Topic :: binary()) -> [rule()]).
 get_rules_for_topic(Topic) ->
-    [Rule || Rule = #rule{info = #{from := From}} <- get_rules(),
+    [Rule || Rule = #{from := From} <- get_rules(),
              emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)].
 
--spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())).
+-spec(get_rules_with_same_event(Topic :: binary()) -> [rule()]).
 get_rules_with_same_event(Topic) ->
     EventName = emqx_rule_events:event_name(Topic),
-    [Rule || Rule = #rule{info = #{from := From}} <- get_rules(),
+    [Rule || Rule = #{from := From} <- get_rules(),
              lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)].
 
 is_of_event_name(EventName, Topic) ->
     EventName =:= emqx_rule_events:event_name(Topic).
 
--spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found).
+-spec(get_rule(Id :: rule_id()) -> {ok, rule()} | not_found).
 get_rule(Id) ->
-    case mnesia:dirty_read(?RULE_TAB, Id) of
-        [Rule] -> {ok, Rule};
+    case ets:lookup(?RULE_TAB, Id) of
+        [{Id, Rule}] -> {ok, Rule#{id => Id}};
         [] -> not_found
     end.
 
--spec(add_rule(emqx_rule_engine:rule()) -> ok).
-add_rule(Rule) when is_record(Rule, rule) ->
+-spec(add_rule(rule()) -> ok).
+add_rule(Rule) ->
     add_rules([Rule]).
 
--spec(add_rules(list(emqx_rule_engine:rule())) -> ok).
+-spec(add_rules([rule()]) -> ok).
 add_rules(Rules) ->
     gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL).
 
--spec(remove_rule(emqx_rule_engine:rule() | rule_id()) -> ok).
+-spec(remove_rule(rule() | rule_id()) -> ok).
 remove_rule(RuleOrId) ->
     remove_rules([RuleOrId]).
 
--spec(remove_rules(list(emqx_rule_engine:rule()) | list(rule_id())) -> ok).
+-spec(remove_rules([rule()] | list(rule_id())) -> ok).
 remove_rules(Rules) ->
     gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL).
 
 %% @private
 
-insert_rules([]) -> ok;
-insert_rules(Rules) ->
-    _ = emqx_plugin_libs_rule:cluster_call(?MODULE, load_hooks_for_rule, [Rules]),
-    [mnesia:write(?RULE_TAB, Rule, write) ||Rule <- Rules].
+do_add_rules([]) -> ok;
+do_add_rules(Rules) ->
+    load_hooks_for_rules(Rules),
+    add_metrics_for_rules(Rules),
+    ets:insert(?RULE_TAB, [{Id, maps:remove(id, R)} || #{id := Id} = R <- Rules]),
+    ok.
 
 %% @private
-delete_rules([]) -> ok;
-delete_rules(Rules = [R|_]) when is_binary(R) ->
+do_remove_rules([]) -> ok;
+do_remove_rules(RuleIds = [Id|_]) when is_binary(Id) ->
     RuleRecs =
         lists:foldl(fun(RuleId, Acc) ->
             case get_rule(RuleId) of
                 {ok, Rule} ->  [Rule|Acc];
                 not_found -> Acc
             end
-        end, [], Rules),
-    delete_rules_unload_hooks(RuleRecs);
-delete_rules(Rules = [Rule|_]) when is_record(Rule, rule) ->
-    delete_rules_unload_hooks(Rules).
-
-delete_rules_unload_hooks(Rules) ->
-    _ =  emqx_plugin_libs_rule:cluster_call(?MODULE, unload_hooks_for_rule, [Rules]),
-    [mnesia:delete_object(?RULE_TAB, Rule, write) ||Rule <- Rules].
+        end, [], RuleIds),
+    remove_rules_unload_hooks(RuleRecs);
+do_remove_rules(Rules = [Rule|_]) when is_map(Rule) ->
+    remove_rules_unload_hooks(Rules).
+
+remove_rules_unload_hooks(Rules) ->
+    unload_hooks_for_rule(Rules),
+    clear_metrics_for_rules(Rules),
+    lists:foreach(fun(#{id := Id}) ->
+            ets:delete(?RULE_TAB, Id)
+        end, Rules).
 
-load_hooks_for_rule(Rules) ->
-    lists:foreach(fun(#rule{info = #{from := Topics}}) ->
+load_hooks_for_rules(Rules) ->
+    lists:foreach(fun(#{from := Topics}) ->
             lists:foreach(fun emqx_rule_events:load/1, Topics)
         end, Rules).
 
+add_metrics_for_rules(Rules) ->
+    lists:foreach(fun(#{id := Id}) ->
+            ok = emqx_rule_metrics:create_rule_metrics(Id)
+        end, Rules).
+
+clear_metrics_for_rules(Rules) ->
+    lists:foreach(fun(#{id := Id}) ->
+            ok = emqx_rule_metrics:clear_rule_metrics(Id)
+        end, Rules).
+
 unload_hooks_for_rule(Rules) ->
-    lists:foreach(fun(#rule{id = Id, info = #{from := Topics}}) ->
+    lists:foreach(fun(#{id := Id, from := Topics}) ->
         lists:foreach(fun(Topic) ->
             case get_rules_with_same_event(Topic) of
-                [#rule{id = Id0}] when Id0 == Id -> %% we are now deleting the last rule
+                [#{id := Id0}] when Id0 == Id -> %% we are now deleting the last rule
                     emqx_rule_events:unload(Topic);
                 _ -> ok
             end
@@ -197,11 +183,11 @@ init([]) ->
     {ok, #{}}.
 
 handle_call({add_rules, Rules}, _From, State) ->
-    trans(fun insert_rules/1, [Rules]),
+    _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_add_rules, [Rules]),
     {reply, ok, State};
 
 handle_call({remove_rules, Rules}, _From, State) ->
-    trans(fun delete_rules/1, [Rules]),
+    _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_remove_rules, [Rules]),
     {reply, ok, State};
 
 handle_call(Req, _From, State) ->
@@ -227,19 +213,4 @@ code_change(_OldVsn, State, _Extra) ->
 %%------------------------------------------------------------------------------
 
 get_all_records(Tab) ->
-    %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
-    %% Wrapping ets to a transaction to avoid reading inconsistent
-    %% ( nest cluster_call transaction, no a r/o transaction)
-    %% data during shard bootstrap
-    {atomic, Ret} =
-        ekka_mnesia:transaction(?RULE_ENGINE_SHARD,
-                                   fun() ->
-                                           ets:tab2list(Tab)
-                                   end),
-    Ret.
-
-trans(Fun, Args) ->
-    case ekka_mnesia:transaction(?RULE_ENGINE_SHARD, Fun, Args) of
-        {atomic, Result} -> Result;
-        {aborted, Reason} -> error(Reason)
-    end.
+    [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].

+ 17 - 27
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -45,12 +45,12 @@
 %%------------------------------------------------------------------------------
 %% Apply rules
 %%------------------------------------------------------------------------------
--spec(apply_rules(list(emqx_rule_engine:rule()), input()) -> ok).
+-spec(apply_rules(list(rule()), input()) -> ok).
 apply_rules([], _Input) ->
     ok;
-apply_rules([#rule{info = #{enabled := false}}|More], Input) ->
+apply_rules([#{enabled := false}|More], Input) ->
     apply_rules(More, Input);
-apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
+apply_rules([Rule = #{id := RuleID}|More], Input) ->
     try apply_rule_discard_result(Rule, Input)
     catch
         %% ignore the errors if select or match failed
@@ -80,18 +80,19 @@ apply_rule_discard_result(Rule, Input) ->
     _ = apply_rule(Rule, Input),
     ok.
 
-apply_rule(Rule = #rule{id = RuleID}, Input) ->
+apply_rule(Rule = #{id := RuleID}, Input) ->
     clear_rule_payload(),
     do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
 
-do_apply_rule(#rule{id = RuleId, info = #{
+do_apply_rule(#{
+            id := RuleId,
             is_foreach := true,
             fields := Fields,
             doeach := DoEach,
             incase := InCase,
             conditions := Conditions,
             outputs := Outputs
-        }}, Input) ->
+        }, Input) ->
     {Selected, Collection} = ?RAISE(select_and_collect(Fields, Input),
                                         {select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
     ColumnsAndSelected = maps:merge(Input, Selected),
@@ -105,12 +106,12 @@ do_apply_rule(#rule{id = RuleId, info = #{
             {error, nomatch}
     end;
 
-do_apply_rule(#rule{id = RuleId, info = #{
-            is_foreach := false,
-            fields := Fields,
-            conditions := Conditions,
-            outputs := Outputs
-        }}, Input) ->
+do_apply_rule(#{id := RuleId,
+                is_foreach := false,
+                fields := Fields,
+                conditions := Conditions,
+                outputs := Outputs
+            }, Input) ->
     Selected = ?RAISE(select_and_transform(Fields, Input),
                       {select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
     case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
@@ -246,25 +247,14 @@ handle_output(OutId, Selected, Envs) ->
                           })
     end.
 
-do_handle_output(#{type := bridge, target := ChannelId}, Selected, _Envs) ->
+do_handle_output(ChannelId, Selected, _Envs) when is_binary(ChannelId) ->
     ?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}),
     emqx_bridge:send_message(ChannelId, Selected);
-do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) ->
+do_handle_output(#{function := Func} = Out, Selected, Envs) when is_function(Func) ->
     erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]);
-do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs)
-        when is_atom(Output) ->
-    handle_builtin_output(Output, Selected, Envs, maps:get(args, Out, #{}));
-do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs)
-        when is_binary(Output) ->
-    try binary_to_existing_atom(Output) of
-        Func -> handle_builtin_output(Func, Selected, Envs, maps:get(args, Out, #{}))
-    catch
-        error:badarg -> error(not_found)
-    end.
-
-handle_builtin_output(Func, Selected, Envs, Args) ->
+do_handle_output(#{function := Func} = Out, Selected, Envs) when is_atom(Func) ->
     case erlang:function_exported(emqx_rule_outputs, Func, 3) of
-        true -> erlang:apply(emqx_rule_outputs, Func, [Selected, Envs, Args]);
+        true -> erlang:apply(emqx_rule_outputs, Func, [Selected, Envs, maps:get(args, Out, #{})]);
         false -> error(not_found)
     end.
 

+ 12 - 14
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -42,20 +42,18 @@ test(#{sql := Sql, context := Context}) ->
 test_rule(Sql, Select, Context, EventTopics) ->
     RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]),
     ok = emqx_rule_metrics:create_rule_metrics(RuleId),
-    Rule = #rule{
-        id = RuleId,
-        info = #{
-            sql => Sql,
-            from => EventTopics,
-            outputs => [#{type => func, target => fun ?MODULE:get_selected_data/3, args => #{}}],
-            enabled => true,
-            is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
-            fields => emqx_rule_sqlparser:select_fields(Select),
-            doeach => emqx_rule_sqlparser:select_doeach(Select),
-            incase => emqx_rule_sqlparser:select_incase(Select),
-            conditions => emqx_rule_sqlparser:select_where(Select)
-        },
-        created_at = erlang:system_time(millisecond)
+    Rule = #{
+        id => RuleId,
+        sql => Sql,
+        from => EventTopics,
+        outputs => [#{function => fun ?MODULE:get_selected_data/3, args => #{}}],
+        enabled => true,
+        is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
+        fields => emqx_rule_sqlparser:select_fields(Select),
+        doeach => emqx_rule_sqlparser:select_doeach(Select),
+        incase => emqx_rule_sqlparser:select_incase(Select),
+        conditions => emqx_rule_sqlparser:select_where(Select),
+        created_at => erlang:system_time(millisecond)
     },
     FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
     try

+ 24 - 74
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -30,7 +30,6 @@
 
 all() ->
     [ {group, engine}
-    , {group, api}
     , {group, funcs}
     , {group, registry}
     , {group, runtime}
@@ -45,9 +44,6 @@ groups() ->
     [{engine, [sequence],
       [t_create_rule
       ]},
-     {api, [],
-      [t_crud_rule_api
-       ]},
      {funcs, [],
       [t_kv_store
       ]},
@@ -108,8 +104,6 @@ groups() ->
 
 init_per_suite(Config) ->
     application:load(emqx_machine),
-    ok = ekka_mnesia:start(),
-    ok = emqx_rule_registry:mnesia(boot),
     ok = emqx_ct_helpers:start_apps([emqx_rule_engine]),
     Config.
 
@@ -155,12 +149,12 @@ init_per_testcase(t_events, Config) ->
         #{id => <<"rule:t_events">>,
             sql => SQL,
             outputs => [
-                #{type => builtin, target => console},
-                #{type => func, target => fun ?MODULE:output_record_triggered_events/3,
+                #{function => console},
+                #{function => fun ?MODULE:output_record_triggered_events/3,
                   args => #{}}
             ],
             description => <<"to console and record triggered events">>}),
-    ?assertMatch(#rule{id = <<"rule:t_events">>}, Rule),
+    ?assertMatch(#{id := <<"rule:t_events">>}, Rule),
     [{hook_points_rules, Rule} | Config];
 init_per_testcase(_TestCase, Config) ->
     emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
@@ -176,59 +170,17 @@ end_per_testcase(_TestCase, _Config) ->
 %% Test cases for rule engine
 %%------------------------------------------------------------------------------
 t_create_rule(_Config) ->
-    {ok, #rule{id = Id}} = emqx_rule_engine:create_rule(
+    {ok, #{id := Id}} = emqx_rule_engine:create_rule(
             #{sql => <<"select * from \"t/a\"">>,
               id => <<"t_create_rule">>,
-              outputs => [#{type => builtin, target => console}],
+              outputs => [#{function => console}],
               description => <<"debug rule">>}),
     ct:pal("======== emqx_rule_registry:get_rules :~p", [emqx_rule_registry:get_rules()]),
-    ?assertMatch({ok, #rule{id = Id, info = #{from := [<<"t/a">>]}}},
+    ?assertMatch({ok, #{id := Id, from := [<<"t/a">>]}},
         emqx_rule_registry:get_rule(Id)),
     emqx_rule_registry:remove_rule(Id),
     ok.
 
-%%------------------------------------------------------------------------------
-%% Test cases for rule engine api
-%%------------------------------------------------------------------------------
-
-t_crud_rule_api(_Config) ->
-    RuleID = <<"my_rule">>,
-    Params0 = #{
-        <<"description">> => <<"A simple rule">>,
-        <<"enable">> => true,
-        <<"id">> => RuleID,
-        <<"outputs">> => [#{<<"type">> => <<"builtin">>, <<"target">> => <<"console">>}],
-        <<"sql">> => <<"SELECT * from \"t/1\"">>
-    },
-    {201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}),
-
-    ?assertEqual(RuleID, maps:get(id, Rule)),
-    {200, Rules} = emqx_rule_engine_api:crud_rules(get, #{}),
-    ct:pal("RList : ~p", [Rules]),
-    ?assert(length(Rules) > 0),
-
-    {200, Rule1} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}),
-    ct:pal("RShow : ~p", [Rule1]),
-    ?assertEqual(Rule, Rule1),
-
-    {200, Rule2} = emqx_rule_engine_api:crud_rules_by_id(put, #{
-            bindings => #{id => RuleID},
-            body => Params0#{<<"sql">> => <<"select * from \"t/b\"">>}
-        }),
-
-    {200, Rule3} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}),
-    %ct:pal("RShow : ~p", [Rule3]),
-    ?assertEqual(Rule3, Rule2),
-    ?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)),
-
-    ?assertMatch({200}, emqx_rule_engine_api:crud_rules_by_id(delete,
-        #{bindings => #{id => RuleID}})),
-
-    %ct:pal("Show After Deleted: ~p", [NotFound]),
-    ?assertMatch({404, #{code := _, message := _Message}},
-        emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}})),
-    ok.
-
 %%------------------------------------------------------------------------------
 %% Test cases for rule funcs
 %%------------------------------------------------------------------------------
@@ -248,14 +200,14 @@ t_kv_store(_) ->
 t_add_get_remove_rule(_Config) ->
     RuleId0 = <<"rule-debug-0">>,
     ok = emqx_rule_registry:add_rule(make_simple_rule(RuleId0)),
-    ?assertMatch({ok, #rule{id = RuleId0}}, emqx_rule_registry:get_rule(RuleId0)),
+    ?assertMatch({ok, #{id := RuleId0}}, emqx_rule_registry:get_rule(RuleId0)),
     ok = emqx_rule_registry:remove_rule(RuleId0),
     ?assertEqual(not_found, emqx_rule_registry:get_rule(RuleId0)),
 
     RuleId1 = <<"rule-debug-1">>,
     Rule1 = make_simple_rule(RuleId1),
     ok = emqx_rule_registry:add_rule(Rule1),
-    ?assertMatch({ok, #rule{id = RuleId1}}, emqx_rule_registry:get_rule(RuleId1)),
+    ?assertMatch({ok, #{id := RuleId1}}, emqx_rule_registry:get_rule(RuleId1)),
     ok = emqx_rule_registry:remove_rule(Rule1),
     ?assertEqual(not_found, emqx_rule_registry:get_rule(RuleId1)),
     ok.
@@ -282,9 +234,9 @@ t_create_existing_rule(_Config) ->
     {ok, _} = emqx_rule_engine:create_rule(
                     #{id => <<"an_existing_rule">>,
                       sql => <<"select * from \"t/#\"">>,
-                      outputs => [#{type => builtin, target => console}]
+                      outputs => [#{function => console}]
                      }),
-    {ok, #rule{info = #{sql := SQL}}} = emqx_rule_registry:get_rule(<<"an_existing_rule">>),
+    {ok, #{sql := SQL}} = emqx_rule_registry:get_rule(<<"an_existing_rule">>),
     ?assertEqual(<<"select * from \"t/#\"">>, SQL),
 
     ok = emqx_rule_engine:delete_rule(<<"an_existing_rule">>),
@@ -308,9 +260,9 @@ t_get_rules_ordered_by_ts(_Config) ->
              make_simple_rule_with_ts(<<"rule-debug-2">>, Now())
              ]),
     ?assertMatch([
-        #rule{id = <<"rule-debug-0">>},
-        #rule{id = <<"rule-debug-1">>},
-        #rule{id = <<"rule-debug-2">>}
+        #{id := <<"rule-debug-0">>},
+        #{id := <<"rule-debug-1">>},
+        #{id := <<"rule-debug-2">>}
     ], emqx_rule_registry:get_rules_ordered_by_ts()).
 
 t_get_rules_for_topic_2(_Config) ->
@@ -1349,7 +1301,7 @@ t_sqlparse_nested_get(_Config) ->
 republish_output(Topic) ->
     republish_output(Topic, <<"${payload}">>).
 republish_output(Topic, Payload) ->
-    #{type => builtin, target => republish,
+    #{function => republish,
       args => #{<<"payload">> => Payload, <<"topic">> => Topic, <<"qos">> => 0}}.
 
 make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
@@ -1366,18 +1318,16 @@ make_simple_rule(RuleId, SQL, Topics) when is_binary(RuleId) ->
     make_simple_rule(RuleId, SQL, Topics, erlang:system_time(millisecond)).
 
 make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) ->
-    #rule{
-        id = RuleId,
-        info = #{
-            sql => SQL,
-            from => Topics,
-            fields => [<<"*">>],
-            is_foreach => false,
-            conditions => {},
-            ouputs => [#{type => builtin, target => console}],
-            description => <<"simple rule">>
-        },
-        created_at = Ts
+    #{
+      id => RuleId,
+      sql => SQL,
+      from => Topics,
+      fields => [<<"*">>],
+      is_foreach => false,
+      conditions => {},
+      ouputs => [#{function => console}],
+      description => <<"simple rule">>,
+      created_at => Ts
     }.
 
 output_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) ->

+ 64 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl

@@ -0,0 +1,64 @@
+-module(emqx_rule_engine_api_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+    emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    application:load(emqx_machine),
+    ok = emqx_ct_helpers:start_apps([emqx_rule_engine]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([emqx_rule_engine]),
+    ok.
+
+init_per_testcase(_, Config) ->
+    {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
+    Config.
+
+end_per_testcase(_, _Config) ->
+    ok.
+
+t_crud_rule_api(_Config) ->
+    RuleID = <<"my_rule">>,
+    Params0 = #{
+        <<"description">> => <<"A simple rule">>,
+        <<"enable">> => true,
+        <<"id">> => RuleID,
+        <<"outputs">> => [#{<<"function">> => <<"console">>}],
+        <<"sql">> => <<"SELECT * from \"t/1\"">>
+    },
+    {201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}),
+
+    ?assertEqual(RuleID, maps:get(id, Rule)),
+    {200, Rules} = emqx_rule_engine_api:crud_rules(get, #{}),
+    ct:pal("RList : ~p", [Rules]),
+    ?assert(length(Rules) > 0),
+
+    {200, Rule1} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}),
+    ct:pal("RShow : ~p", [Rule1]),
+    ?assertEqual(Rule, Rule1),
+
+    {200, Rule2} = emqx_rule_engine_api:crud_rules_by_id(put, #{
+            bindings => #{id => RuleID},
+            body => Params0#{<<"sql">> => <<"select * from \"t/b\"">>}
+        }),
+
+    {200, Rule3} = emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}}),
+    %ct:pal("RShow : ~p", [Rule3]),
+    ?assertEqual(Rule3, Rule2),
+    ?assertEqual(<<"select * from \"t/b\"">>, maps:get(sql, Rule3)),
+
+    ?assertMatch({200}, emqx_rule_engine_api:crud_rules_by_id(delete,
+        #{bindings => #{id => RuleID}})),
+
+    %ct:pal("Show After Deleted: ~p", [NotFound]),
+    ?assertMatch({404, #{code := _, message := _Message}},
+        emqx_rule_engine_api:crud_rules_by_id(get, #{bindings => #{id => RuleID}})),
+    ok.