ソースを参照

refactor(rules): remove resources and actions

Shawn 4 年 前
コミット
af295a9b71

+ 1 - 1
apps/emqx_rule_engine/etc/emqx_rule_engine.conf

@@ -1,6 +1,6 @@
 ##====================================================================
 ## Rule Engine for EMQ X R5.0
 ##====================================================================
-emqx_rule_engine {
+rule_engine {
     ignore_sys_message = true
 }

+ 15 - 91
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -23,14 +23,6 @@
 -type(rule_id() :: binary()).
 -type(rule_name() :: binary()).
 
--type(resource_id() :: binary()).
--type(action_instance_id() :: binary()).
-
--type(action_name() :: atom()).
--type(resource_type_name() :: atom()).
-
--type(category() :: data_persist| data_forward | offline_msgs | debug | other).
-
 -type(descr() :: #{en := binary(), zh => binary()}).
 
 -type(mf() :: {Module::atom(), Fun::atom()}).
@@ -38,89 +30,27 @@
 -type(hook() :: atom() | 'any').
 
 -type(topic() :: binary()).
-
--type(resource_status() :: #{ alive := boolean()
-                            , atom() => binary() | atom() | list(binary()|atom())
-                            }).
-
--define(descr, #{en => <<>>, zh => <<>>}).
-
--record(action,
-        { name :: action_name()
-        , category :: category()
-        , for :: hook()
-        , app :: atom()
-        , types = [] :: list(resource_type_name())
-        , module :: module()
-        , on_create :: mf()
-        , on_destroy :: maybe(mf())
-        , hidden = false :: boolean()
-        , params_spec :: #{atom() => term()} %% params specs
-        , title = ?descr :: descr()
-        , description = ?descr :: descr()
+-type(bridge_channel_id() :: binary()).
+
+-type(rule_info() ::
+       #{ from := list(topic())
+        , to := list(bridge_channel_id() | fun())
+        , sql := binary()
+        , is_foreach := boolean()
+        , fields := list()
+        , doeach := term()
+        , incase := list()
+        , conditions := tuple()
+        , enabled := boolean()
+        , description := binary()
         }).
 
--record(action_instance,
-        { id :: action_instance_id()
-        , name :: action_name()
-        , fallbacks :: list(#action_instance{})
-        , args :: #{binary() => term()} %% the args got from API for initializing action_instance
-        }).
+-define(descr, #{en => <<>>, zh => <<>>}).
 
 -record(rule,
         { id :: rule_id()
-        , for :: list(topic())
-        , rawsql :: binary()
-        , is_foreach :: boolean()
-        , fields :: list()
-        , doeach :: term()
-        , incase :: list()
-        , conditions :: tuple()
-        , on_action_failed :: continue | stop
-        , actions :: list(#action_instance{})
-        , enabled :: boolean()
         , created_at :: integer() %% epoch in millisecond precision
-        , description :: binary()
-        , state = normal :: atom()
-        }).
-
--record(resource,
-        { id :: resource_id()
-        , type :: resource_type_name()
-        , config :: #{} %% the configs got from API for initializing resource
-        , created_at :: integer() | undefined %% epoch in millisecond precision
-        , description :: binary()
-        }).
-
--record(resource_type,
-        { name :: resource_type_name()
-        , provider :: atom()
-        , params_spec :: #{atom() => term()} %% params specs
-        , on_create :: mf()
-        , on_status :: mf()
-        , on_destroy :: mf()
-        , title = ?descr :: descr()
-        , description = ?descr :: descr()
-        }).
-
--record(rule_hooks,
-        { hook :: atom()
-        , rule_id :: rule_id()
-        }).
-
--record(resource_params,
-        { id :: resource_id()
-        , params :: #{} %% the params got after initializing the resource
-        , status = #{is_alive => false} :: #{is_alive := boolean(), atom() => term()}
-        }).
-
--record(action_instance_params,
-        { id :: action_instance_id()
-        %% the params got after initializing the action
-        , params :: #{}
-        %% the Func/Bindings got after initializing the action
-        , apply :: fun((Data::map(), Envs::map()) -> any())
-                 | #{mod := module(), bindings := #{atom() => term()}}
+        , info :: rule_info()
         }).
 
 %% Arithmetic operators
@@ -157,9 +87,3 @@
 
 %% Tables
 -define(RULE_TAB, emqx_rule).
--define(ACTION_TAB, emqx_rule_action).
--define(ACTION_INST_PARAMS_TAB, emqx_action_instance_params).
--define(RES_TAB, emqx_resource).
--define(RES_PARAMS_TAB, emqx_resource_params).
--define(RULE_HOOKS, emqx_rule_hooks).
--define(RES_TYPE_TAB, emqx_resource_type).

+ 0 - 208
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -1,208 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
-%% Define the default actions.
--module(emqx_rule_actions).
-
--include("rule_engine.hrl").
--include("rule_actions.hrl").
--include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/logger.hrl").
-
--define(REPUBLISH_PARAMS_SPEC, #{
-            target_topic => #{
-                order => 1,
-                type => string,
-                required => true,
-                default => <<"repub/to/${clientid}">>,
-                title => #{en => <<"Target Topic">>,
-                           zh => <<"目的主题"/utf8>>},
-                description => #{en => <<"To which topic the message will be republished">>,
-                                 zh => <<"重新发布消息到哪个主题"/utf8>>}
-            },
-            target_qos => #{
-                order => 2,
-                type => number,
-                enum => [-1, 0, 1, 2],
-                required => true,
-                default => 0,
-                title => #{en => <<"Target QoS">>,
-                           zh => <<"目的 QoS"/utf8>>},
-                description => #{en => <<"The QoS Level to be uses when republishing the message. Set to -1 to use the original QoS">>,
-                                 zh => <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS"/utf8>>}
-            },
-            payload_tmpl => #{
-                order => 3,
-                type => string,
-                input => textarea,
-                required => false,
-                default => <<"${payload}">>,
-                title => #{en => <<"Payload Template">>,
-                           zh => <<"消息内容模板"/utf8>>},
-                description => #{en => <<"The payload template, variable interpolation is supported">>,
-                                 zh => <<"消息内容模板,支持变量"/utf8>>}
-            }
-        }).
-
--rule_action(#{name => inspect,
-               category => debug,
-               for => '$any',
-               types => [],
-               create => on_action_create_inspect,
-               params => #{},
-               title => #{en => <<"Inspect (debug)">>,
-                          zh => <<"检查 (调试)"/utf8>>},
-               description => #{en => <<"Inspect the details of action params for debug purpose">>,
-                                zh => <<"检查动作参数 (用以调试)"/utf8>>}
-              }).
-
--rule_action(#{name => republish,
-               category => data_forward,
-               for => '$any',
-               types => [],
-               create => on_action_create_republish,
-               params => ?REPUBLISH_PARAMS_SPEC,
-               title => #{en => <<"Republish">>,
-                          zh => <<"消息重新发布"/utf8>>},
-               description => #{en => <<"Republish a MQTT message to another topic">>,
-                                zh => <<"重新发布消息到另一个主题"/utf8>>}
-              }).
-
--rule_action(#{name => do_nothing,
-               category => debug,
-               for => '$any',
-               types => [],
-               create => on_action_create_do_nothing,
-               params => #{},
-               title => #{en => <<"Do Nothing (debug)">>,
-                          zh => <<"空动作 (调试)"/utf8>>},
-               description => #{en => <<"This action does nothing and never fails. It's for debug purpose">>,
-                                zh => <<"此动作什么都不做,并且不会失败 (用以调试)"/utf8>>}
-              }).
-
--export([on_resource_create/2]).
-
-%% callbacks for rule engine
--export([ on_action_create_inspect/2
-        , on_action_create_republish/2
-        , on_action_create_do_nothing/2
-        ]).
-
--export([ on_action_inspect/2
-        , on_action_republish/2
-        , on_action_do_nothing/2
-        ]).
-
--spec(on_resource_create(binary(), map()) -> map()).
-on_resource_create(_Name, Conf) ->
-    Conf.
-
-%%------------------------------------------------------------------------------
-%% Action 'inspect'
-%%------------------------------------------------------------------------------
--spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
-on_action_create_inspect(Id, Params) ->
-    Params.
-
--spec on_action_inspect(selected_data(), env_vars()) -> any().
-on_action_inspect(Selected, Envs) ->
-    ?ULOG("[inspect]~n"
-          "\tSelected Data: ~p~n"
-          "\tEnvs: ~p~n"
-          "\tAction Init Params: ~p~n", [Selected, Envs, ?bound_v('Params', Envs)]),
-    emqx_rule_metrics:inc_actions_success(?bound_v('Id', Envs)).
-
-
-%%------------------------------------------------------------------------------
-%% Action 'republish'
-%%------------------------------------------------------------------------------
--spec on_action_create_republish(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
-on_action_create_republish(Id, Params = #{
-        <<"target_topic">> := TargetTopic,
-        <<"target_qos">> := TargetQoS,
-        <<"payload_tmpl">> := PayloadTmpl
-       }) ->
-    TopicTks = emqx_plugin_libs_rule:preproc_tmpl(TargetTopic),
-    PayloadTks = emqx_plugin_libs_rule:preproc_tmpl(PayloadTmpl),
-    Params.
-
--spec on_action_republish(selected_data(), env_vars()) -> any().
-on_action_republish(_Selected, Envs = #{
-            topic := Topic,
-            headers := #{republish_by := ActId},
-            ?BINDING_KEYS := #{'Id' := ActId}
-        }) ->
-    ?LOG(error, "[republish] recursively republish detected, msg topic: ~p, target topic: ~p",
-        [Topic, ?bound_v('TargetTopic', Envs)]),
-    emqx_rule_metrics:inc_actions_error(?bound_v('Id', Envs));
-
-on_action_republish(Selected, _Envs = #{
-            qos := QoS, flags := Flags, timestamp := Timestamp,
-            ?BINDING_KEYS := #{
-                'Id' := ActId,
-                'TargetTopic' := TargetTopic,
-                'TargetQoS' := TargetQoS,
-                'TopicTks' := TopicTks,
-                'PayloadTks' := PayloadTks
-            }}) ->
-    ?LOG(debug, "[republish] republish to: ~p, Payload: ~p",
-        [TargetTopic, Selected]),
-    increase_and_publish(ActId,
-        #message{
-            id = emqx_guid:gen(),
-            qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end,
-            from = ActId,
-            flags = Flags,
-            headers = #{republish_by => ActId},
-            topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
-            payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
-            timestamp = Timestamp
-        });
-
-%% in case this is not a "message.publish" request
-on_action_republish(Selected, _Envs = #{
-            ?BINDING_KEYS := #{
-                'Id' := ActId,
-                'TargetTopic' := TargetTopic,
-                'TargetQoS' := TargetQoS,
-                'TopicTks' := TopicTks,
-                'PayloadTks' := PayloadTks
-            }}) ->
-    ?LOG(debug, "[republish] republish to: ~p, Payload: ~p",
-        [TargetTopic, Selected]),
-    increase_and_publish(ActId,
-        #message{
-            id = emqx_guid:gen(),
-            qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end,
-            from = ActId,
-            flags = #{dup => false, retain => false},
-            headers = #{republish_by => ActId},
-            topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
-            payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
-            timestamp = erlang:system_time(millisecond)
-        }).
-
-increase_and_publish(ActId, Msg) ->
-    _ = emqx_broker:safe_publish(Msg),
-    emqx_rule_metrics:inc_actions_success(ActId),
-    emqx_metrics:inc_msg(Msg).
-
--spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
-on_action_create_do_nothing(ActId, Params) when is_binary(ActId) ->
-    Params.
-
-on_action_do_nothing(Selected, Envs) when is_map(Selected) ->
-    emqx_rule_metrics:inc_actions_success(?bound_v('ActId', Envs)).

+ 148 - 0
apps/emqx_rule_engine/src/emqx_rule_api_schema.erl

@@ -0,0 +1,148 @@
+-module(emqx_rule_api_schema).
+
+-behaviour(hocon_schema).
+
+-include_lib("typerefl/include/types.hrl").
+
+-export([ check_params/2
+        ]).
+
+-export([roots/0, fields/1]).
+
+-type tag() :: rule_creation | rule_test.
+
+-spec check_params(map(), tag()) -> {ok, map()} | {error, term()}.
+check_params(Params, Tag) ->
+    BTag = atom_to_binary(Tag),
+    try hocon_schema:check_plain(?MODULE, #{BTag => Params},
+            #{atom_key => true, nullable => true}, [BTag]) of
+        #{Tag := Checked} -> {ok, Checked}
+    catch
+        Error:Reason:ST ->
+            logger:error("check rule params failed: ~p", [{Error, Reason, ST}]),
+            {error, {Reason, ST}}
+    end.
+
+%%======================================================================================
+%% Hocon Schema Definitions
+
+roots() ->
+    [ {"rule_creation", sc(ref("rule_creation"), #{})}
+    , {"rule_test", sc(ref("rule_test"), #{})}
+    ].
+
+fields("rule_creation") ->
+    [ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})}
+    , {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false})}
+    , {"outputs", sc(hoconsc:array(binary()),
+          #{desc => "The outputs of the rule", default => [<<"console">>]})}
+    , {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})}
+    , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})}
+    ];
+
+fields("rule_test") ->
+    [ {"context", sc(hoconsc:union([ ref("ctx_pub")
+                                   , ref("ctx_sub")
+                                   , ref("ctx_delivered")
+                                   , ref("ctx_acked")
+                                   , ref("ctx_dropped")
+                                   , ref("ctx_connected")
+                                   , ref("ctx_disconnected")
+                                   ]),
+        #{desc => "The context of the event for testing",
+          default => #{}})}
+    , {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})}
+    ];
+
+fields("ctx_pub") ->
+    [ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})}
+    , {"id", sc(binary(), #{desc => "Message ID"})}
+    , {"clientid", sc(binary(), #{desc => "The Client ID"})}
+    , {"username", sc(binary(), #{desc => "The User Name"})}
+    , {"payload", sc(binary(), #{desc => "The Message Payload"})}
+    , {"peerhost", sc(binary(), #{desc => "The IP Address of the Peer Client"})}
+    , {"topic", sc(binary(), #{desc => "Message Topic"})}
+    , {"publish_received_at", sc(integer(), #{
+        desc => "The Time that this Message is Received"})}
+    ] ++ [qos()];
+
+fields("ctx_sub") ->
+    [ {"event_type", sc(session_subscribed, #{desc => "Event Type", nullable => false})}
+    , {"clientid", sc(binary(), #{desc => "The Client ID"})}
+    , {"username", sc(binary(), #{desc => "The User Name"})}
+    , {"payload", sc(binary(), #{desc => "The Message Payload"})}
+    , {"peerhost", sc(binary(), #{desc => "The IP Address of the Peer Client"})}
+    , {"topic", sc(binary(), #{desc => "Message Topic"})}
+    , {"publish_received_at", sc(integer(), #{
+        desc => "The Time that this Message is Received"})}
+    ] ++ [qos()];
+
+fields("ctx_unsub") ->
+    [{"event_type", sc(session_unsubscribed, #{desc => "Event Type", nullable => false})}] ++
+    proplists:delete("event_type", fields("ctx_sub"));
+
+fields("ctx_delivered") ->
+    [ {"event_type", sc(message_delivered, #{desc => "Event Type", nullable => false})}
+    , {"id", sc(binary(), #{desc => "Message ID"})}
+    , {"from_clientid", sc(binary(), #{desc => "The Client ID"})}
+    , {"from_username", sc(binary(), #{desc => "The User Name"})}
+    , {"clientid", sc(binary(), #{desc => "The Client ID"})}
+    , {"username", sc(binary(), #{desc => "The User Name"})}
+    , {"payload", sc(binary(), #{desc => "The Message Payload"})}
+    , {"peerhost", sc(binary(), #{desc => "The IP Address of the Peer Client"})}
+    , {"topic", sc(binary(), #{desc => "Message Topic"})}
+    , {"publish_received_at", sc(integer(), #{
+        desc => "The Time that this Message is Received"})}
+    ] ++ [qos()];
+
+fields("ctx_acked") ->
+    [{"event_type", sc(message_acked, #{desc => "Event Type", nullable => false})}] ++
+    proplists:delete("event_type", fields("ctx_delivered"));
+
+fields("ctx_dropped") ->
+    [ {"event_type", sc(message_dropped, #{desc => "Event Type", nullable => false})}
+    , {"id", sc(binary(), #{desc => "Message ID"})}
+    , {"reason", sc(binary(), #{desc => "The Reason for Dropping"})}
+    , {"clientid", sc(binary(), #{desc => "The Client ID"})}
+    , {"username", sc(binary(), #{desc => "The User Name"})}
+    , {"payload", sc(binary(), #{desc => "The Message Payload"})}
+    , {"peerhost", sc(binary(), #{desc => "The IP Address of the Peer Client"})}
+    , {"topic", sc(binary(), #{desc => "Message Topic"})}
+    , {"publish_received_at", sc(integer(), #{
+        desc => "The Time that this Message is Received"})}
+    ] ++ [qos()];
+
+fields("ctx_connected") ->
+    [ {"event_type", sc(client_connected, #{desc => "Event Type", nullable => false})}
+    , {"clientid", sc(binary(), #{desc => "The Client ID"})}
+    , {"username", sc(binary(), #{desc => "The User Name"})}
+    , {"mountpoint", sc(binary(), #{desc => "The Mountpoint"})}
+    , {"peername", sc(binary(), #{desc => "The IP Address and Port of the Peer Client"})}
+    , {"sockname", sc(binary(), #{desc => "The IP Address and Port of the Local Listener"})}
+    , {"proto_name", sc(binary(), #{desc => "Protocol Name"})}
+    , {"proto_ver", sc(binary(), #{desc => "Protocol Version"})}
+    , {"keepalive", sc(integer(), #{desc => "KeepAlive"})}
+    , {"clean_start", sc(boolean(), #{desc => "Clean Start", default => true})}
+    , {"expiry_interval", sc(integer(), #{desc => "Expiry Interval"})}
+    , {"is_bridge", sc(boolean(), #{desc => "Is Bridge", default => false})}
+    , {"connected_at", sc(integer(), #{
+        desc => "The Time that this Client is Connected"})}
+    ];
+
+fields("ctx_disconnected") ->
+    [ {"event_type", sc(client_disconnected, #{desc => "Event Type", nullable => false})}
+    , {"clientid", sc(binary(), #{desc => "The Client ID"})}
+    , {"username", sc(binary(), #{desc => "The User Name"})}
+    , {"reason", sc(binary(), #{desc => "The Reason for Disconnect"})}
+    , {"peername", sc(binary(), #{desc => "The IP Address and Port of the Peer Client"})}
+    , {"sockname", sc(binary(), #{desc => "The IP Address and Port of the Local Listener"})}
+    , {"disconnected_at", sc(integer(), #{
+        desc => "The Time that this Client is Disconnected"})}
+    ].
+
+qos() ->
+    {"qos", sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2)]),
+        #{desc => "The Message QoS"})}.
+
+sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+ref(Field) -> hoconsc:ref(?MODULE, Field).

+ 38 - 593
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -19,627 +19,72 @@
 -include("rule_engine.hrl").
 -include_lib("emqx/include/logger.hrl").
 
--export([ load_providers/0
-        , unload_providers/0
-        , refresh_resources/0
-        , refresh_resource/1
-        , refresh_rule/1
-        , refresh_rules/0
-        , refresh_actions/1
-        , refresh_actions/2
-        , refresh_resource_status/0
-        ]).
-
 -export([ create_rule/1
         , update_rule/1
         , delete_rule/1
-        , create_resource/1
-        , test_resource/1
-        , start_resource/1
-        , get_resource_status/1
-        , get_resource_params/1
-        , delete_resource/1
-        , update_resource/2
         ]).
 
--export([ init_resource/4
-        , init_action/4
-        , clear_resource/3
-        , clear_rule/1
-        , clear_actions/1
-        , clear_action/3
-        ]).
+-export_type([rule/0]).
 
 -type(rule() :: #rule{}).
--type(action() :: #action{}).
--type(resource() :: #resource{}).
--type(resource_type() :: #resource_type{}).
--type(resource_params() :: #resource_params{}).
--type(action_instance_params() :: #action_instance_params{}).
-
--export_type([ rule/0
-             , action/0
-             , resource/0
-             , resource_type/0
-             , resource_params/0
-             , action_instance_params/0
-             ]).
 
 -define(T_RETRY, 60000).
 
-%%------------------------------------------------------------------------------
-%% Load resource/action providers from all available applications
-%%------------------------------------------------------------------------------
-
-%% Load all providers .
--spec(load_providers() -> ok).
-load_providers() ->
-    lists:foreach(fun(App) ->
-        load_provider(App)
-    end, ignore_lib_apps(application:loaded_applications())).
-
--spec(load_provider(App :: atom()) -> ok).
-load_provider(App) when is_atom(App) ->
-    ok = load_actions(App),
-    ok = load_resource_types(App).
-
-%%------------------------------------------------------------------------------
-%% Unload providers
-%%------------------------------------------------------------------------------
-%% Load all providers .
--spec(unload_providers() -> ok).
-unload_providers() ->
-    lists:foreach(fun(App) ->
-        unload_provider(App)
-    end, ignore_lib_apps(application:loaded_applications())).
-
-%% @doc Unload a provider.
--spec(unload_provider(App :: atom()) -> ok).
-unload_provider(App) ->
-    ok = emqx_rule_registry:remove_actions_of(App),
-    ok = emqx_rule_registry:unregister_resource_types_of(App).
-
-load_actions(App) ->
-    Actions = find_actions(App),
-    emqx_rule_registry:add_actions(Actions).
-
-load_resource_types(App) ->
-    ResourceTypes = find_resource_types(App),
-    emqx_rule_registry:register_resource_types(ResourceTypes).
-
--spec(find_actions(App :: atom()) -> list(action())).
-find_actions(App) ->
-    lists:map(fun new_action/1, find_attrs(App, rule_action)).
-
--spec(find_resource_types(App :: atom()) -> list(resource_type())).
-find_resource_types(App) ->
-    lists:map(fun new_resource_type/1, find_attrs(App, resource_type)).
-
-new_action({App, Mod, #{name := Name,
-                        for := Hook,
-                        types := Types,
-                        create := Create,
-                        params := ParamsSpec} = Params}) ->
-    ok = emqx_rule_validator:validate_spec(ParamsSpec),
-    #action{name = Name, for = Hook, app = App, types = Types,
-            category = maps:get(category, Params, other),
-            module = Mod, on_create = Create,
-            hidden = maps:get(hidden, Params, false),
-            on_destroy = maps:get(destroy, Params, undefined),
-            params_spec = ParamsSpec,
-            title = maps:get(title, Params, ?descr),
-            description = maps:get(description, Params, ?descr)}.
-
-new_resource_type({App, Mod, #{name := Name,
-                               params := ParamsSpec,
-                               create := Create} = Params}) ->
-    ok = emqx_rule_validator:validate_spec(ParamsSpec),
-    #resource_type{name = Name, provider = App,
-                   params_spec = ParamsSpec,
-                   on_create = {Mod, Create},
-                   on_status = {Mod, maps:get(status, Params, undefined)},
-                   on_destroy = {Mod, maps:get(destroy, Params, undefined)},
-                   title = maps:get(title, Params, ?descr),
-                   description = maps:get(description, Params, ?descr)}.
-
-find_attrs(App, Def) ->
-    [{App, Mod, Attr} || {ok, Modules} <- [application:get_key(App, modules)],
-                         Mod <- Modules,
-                         {Name, Attrs} <- module_attributes(Mod), Name =:= Def,
-                         Attr <- Attrs].
-
-module_attributes(Module) ->
-    try Module:module_info(attributes)
-    catch
-        error:undef -> []
-    end.
-
 %%------------------------------------------------------------------------------
 %% APIs for rules and resources
 %%------------------------------------------------------------------------------
 
--dialyzer([{nowarn_function, [create_rule/1, rule_id/0]}]).
 -spec create_rule(map()) -> {ok, rule()} | {error, term()}.
-create_rule(Params = #{rawsql := Sql, actions := ActArgs}) ->
-    case emqx_rule_sqlparser:parse_select(Sql) of
-        {ok, Select} ->
-            RuleId = maps:get(id, Params, rule_id()),
-            Enabled = maps:get(enabled, Params, true),
-            try prepare_actions(ActArgs, Enabled) of
-                Actions ->
-                    Rule = #rule{
-                        id = RuleId,
-                        rawsql = Sql,
-                        for = emqx_rule_sqlparser:select_from(Select),
-                        is_foreach = emqx_rule_sqlparser:select_is_foreach(Select),
-                        fields = emqx_rule_sqlparser:select_fields(Select),
-                        doeach = emqx_rule_sqlparser:select_doeach(Select),
-                        incase = emqx_rule_sqlparser:select_incase(Select),
-                        conditions = emqx_rule_sqlparser:select_where(Select),
-                        on_action_failed = maps:get(on_action_failed, Params, continue),
-                        actions = Actions,
-                        enabled = Enabled,
-                        created_at = erlang:system_time(millisecond),
-                        description = maps:get(description, Params, ""),
-                        state = normal
-                    },
-                    ok = emqx_rule_registry:add_rule(Rule),
-                    ok = emqx_rule_metrics:create_rule_metrics(RuleId),
-                    {ok, Rule}
-            catch
-                throw:{action_not_found, ActionName} ->
-                    {error, {action_not_found, ActionName}};
-                throw:Reason ->
-                    {error, Reason}
-            end;
-        Reason -> {error, Reason}
+create_rule(Params = #{id := RuleId}) ->
+    case emqx_rule_registry:get_rule(RuleId) of
+        not_found -> do_create_rule(Params);
+        {ok, _} -> {error, {already_exists, RuleId}}
     end.
 
--spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | {error, {not_found, rule_id()}}).
+-spec update_rule(map()) -> {ok, rule()} | {error, term()}.
 update_rule(Params = #{id := RuleId}) ->
-    case emqx_rule_registry:get_rule(RuleId) of
-        {ok, Rule0} ->
-            try may_update_rule_params(Rule0, Params) of
-                Rule ->
-                    ok = emqx_rule_registry:add_rule(Rule),
-                    {ok, Rule}
-            catch
-                throw:Reason ->
-                    {error, Reason}
-            end;
-        not_found ->
-            {error, {not_found, RuleId}}
+    case delete_rule(RuleId) of
+        ok -> do_create_rule(Params);
+        Error -> Error
     end.
 
--spec(delete_rule(RuleId :: rule_id()) -> ok).
+-spec(delete_rule(RuleId :: rule_id()) -> ok | {error, term()}).
 delete_rule(RuleId) ->
     case emqx_rule_registry:get_rule(RuleId) of
-        {ok, Rule = #rule{actions = Actions}} ->
-            try
-                _ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_rule, [Rule]),
-                ok = emqx_rule_registry:remove_rule(Rule)
-            catch
-                Error:Reason:ST ->
-                    ?LOG(error, "clear_rule ~p failed: ~p", [RuleId, {Error, Reason, ST}]),
-                    refresh_actions(Actions)
-            end;
-        not_found ->
-            ok
-    end.
-
--spec(create_resource(#{type := _, config := _, _ => _}) -> {ok, resource()} | {error, Reason :: term()}).
-create_resource(#{type := Type, config := Config0} = Params) ->
-    case emqx_rule_registry:find_resource_type(Type) of
-        {ok, #resource_type{on_create = {M, F}, params_spec = ParamSpec}} ->
-            Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
-            ResId = maps:get(id, Params, resource_id()),
-            Resource = #resource{id = ResId,
-                                 type = Type,
-                                 config = Config,
-                                 description = iolist_to_binary(maps:get(description, Params, "")),
-                                 created_at = erlang:system_time(millisecond)
-                                },
-            ok = emqx_rule_registry:add_resource(Resource),
-            %% Note that we will return OK in case of resource creation failure,
-            %% A timer is started to re-start the resource later.
-            catch _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]),
-            {ok, Resource};
-        not_found ->
-            {error, {resource_type_not_found, Type}}
-    end.
-
--spec(update_resource(resource_id(), map()) -> ok | {error, Reason :: term()}).
-update_resource(ResId, NewParams) ->
-    case emqx_rule_registry:find_enabled_rules_depends_on_resource(ResId) of
-        [] -> check_and_update_resource(ResId, NewParams);
-        Rules ->
-            {error, {dependent_rules_exists, [Id || #rule{id = Id} <- Rules]}}
-    end.
-
-check_and_update_resource(Id, NewParams) ->
-    case emqx_rule_registry:find_resource(Id) of
-        {ok, #resource{id = Id, type = Type, config = OldConfig, description = OldDescr}} ->
-            try
-                Conifg = maps:get(<<"config">>, NewParams, OldConfig),
-                Descr = maps:get(<<"description">>, NewParams, OldDescr),
-                do_check_and_update_resource(#{id => Id, config => Conifg, type => Type,
-                    description => Descr})
-            catch Error:Reason:ST ->
-                ?LOG(error, "check_and_update_resource failed: ~0p", [{Error, Reason, ST}]),
-                {error, Reason}
-            end;
-        _Other ->
-            {error, not_found}
-    end.
-
-do_check_and_update_resource(#{id := Id, type := Type, description := NewDescription,
-                               config := NewConfig}) ->
-    case emqx_rule_registry:find_resource_type(Type) of
-        {ok, #resource_type{on_create = {Module, Create},
-                            params_spec = ParamSpec}} ->
-            Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec),
-            case test_resource(#{type => Type, config => NewConfig}) of
-                ok ->
-                    _ =  emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]),
-                    emqx_rule_registry:add_resource(#resource{
-                        id = Id,
-                        type = Type,
-                        config = Config,
-                        description = NewDescription,
-                        created_at = erlang:system_time(millisecond)
-                    }),
-                    ok;
-               {error, Reason} ->
-                    error({error, Reason})
-            end
-    end.
-
--spec(start_resource(resource_id()) -> ok | {error, Reason :: term()}).
-start_resource(ResId) ->
-    case emqx_rule_registry:find_resource(ResId) of
-        {ok, #resource{type = ResType, config = Config}} ->
-            {ok, #resource_type{on_create = {Mod, Create}}}
-                = emqx_rule_registry:find_resource_type(ResType),
-            try
-                init_resource(Mod, Create, ResId, Config),
-                refresh_actions_of_a_resource(ResId)
-            catch
-                throw:Reason -> {error, Reason}
-            end;
-        not_found ->
-            {error, {resource_not_found, ResId}}
-    end.
-
--spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
-test_resource(#{type := Type, config := Config0}) ->
-    case emqx_rule_registry:find_resource_type(Type) of
-        {ok, #resource_type{on_create = {ModC, Create},
-                            on_destroy = {ModD, Destroy},
-                            params_spec = ParamSpec}} ->
-            Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
-            ResId = resource_id(),
-            try
-                _ =  emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]),
-                _ =  emqx_plugin_libs_rule:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]),
-                ok
-            catch
-                throw:Reason -> {error, Reason}
-            end;
-        not_found ->
-            {error, {resource_type_not_found, Type}}
-    end.
-
--spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
-get_resource_status(ResId) ->
-    case emqx_rule_registry:find_resource(ResId) of
-        {ok, #resource{type = ResType}} ->
-            {ok, #resource_type{on_status = {Mod, OnStatus}}}
-                = emqx_rule_registry:find_resource_type(ResType),
-            Status = fetch_resource_status(Mod, OnStatus, ResId),
-            {ok, Status};
-        not_found ->
-            {error, {resource_not_found, ResId}}
-    end.
-
--spec(get_resource_params(resource_id()) -> {ok, map()} | {error, Reason :: term()}).
-get_resource_params(ResId) ->
-     case emqx_rule_registry:find_resource_params(ResId) of
-        {ok, #resource_params{params = Params}} ->
-            {ok, Params};
-        not_found ->
-            {error, resource_not_initialized}
-    end.
-
--spec(delete_resource(resource_id()) -> ok | {error, Reason :: term()}).
-delete_resource(ResId) ->
-    case emqx_rule_registry:find_resource(ResId) of
-        {ok, #resource{type = ResType}} ->
-            {ok, #resource_type{on_destroy = {ModD, Destroy}}}
-                = emqx_rule_registry:find_resource_type(ResType),
-            try
-                case emqx_rule_registry:remove_resource(ResId) of
-                    ok ->
-                        _ =  emqx_plugin_libs_rule:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]),
-                        ok;
-                    {error, _} = R -> R
-                end
-            catch
-                throw:Reason -> {error, Reason}
-            end;
+        {ok, Rule} ->
+            ok = emqx_rule_registry:remove_rule(Rule),
+            _ = emqx_plugin_libs_rule:cluster_call(emqx_rule_metrics, clear_rule_metrics, [RuleId]),
+            ok;
         not_found ->
             {error, not_found}
     end.
 
-%%------------------------------------------------------------------------------
-%% Re-establish resources
-%%------------------------------------------------------------------------------
-
--spec(refresh_resources() -> ok).
-refresh_resources() ->
-    lists:foreach(fun refresh_resource/1,
-                  emqx_rule_registry:get_resources()).
-
-refresh_resource(Type) when is_atom(Type) ->
-    lists:foreach(fun refresh_resource/1,
-                  emqx_rule_registry:get_resources_by_type(Type));
-
-refresh_resource(#resource{id = ResId}) ->
-    emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY).
-
--spec(refresh_rules() -> ok).
-refresh_rules() ->
-    lists:foreach(fun
-        (#rule{enabled = true} = Rule) ->
-            try refresh_rule(Rule)
-            catch _:_ ->
-                emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup})
-            end;
-        (_) -> ok
-    end, emqx_rule_registry:get_rules()).
-
-refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) ->
-    ok = emqx_rule_metrics:create_rule_metrics(RuleId),
-    lists:foreach(fun emqx_rule_events:load/1, Topics),
-    refresh_actions(Actions).
-
--spec(refresh_resource_status() -> ok).
-refresh_resource_status() ->
-    lists:foreach(
-        fun(#resource{id = ResId, type = ResType}) ->
-            case emqx_rule_registry:find_resource_type(ResType) of
-                {ok, #resource_type{on_status = {Mod, OnStatus}}} ->
-                    _ = fetch_resource_status(Mod, OnStatus, ResId);
-                _ -> ok
-            end
-        end, emqx_rule_registry:get_resources()).
-
 %%------------------------------------------------------------------------------
 %% Internal Functions
 %%------------------------------------------------------------------------------
-prepare_actions(Actions, NeedInit) ->
-    [prepare_action(Action, NeedInit) || Action <- Actions].
-
-prepare_action(#{name := Name, args := Args0} = Action, NeedInit) ->
-    case emqx_rule_registry:find_action(Name) of
-        {ok, #action{module = Mod, on_create = Create, params_spec = ParamSpec}} ->
-            Args = emqx_rule_validator:validate_params(Args0, ParamSpec),
-            ActionInstId = maps:get(id, Action, action_instance_id(Name)),
-            case NeedInit of
-                true ->
-                    _ =  emqx_plugin_libs_rule:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId,
-                            with_resource_params(Args)]),
-                    ok;
-                false -> ok
-            end,
-            #action_instance{
-                id = ActionInstId, name = Name, args = Args,
-                fallbacks = prepare_actions(maps:get(fallbacks, Action, []), NeedInit)
-            };
-        not_found ->
-            throw({action_not_found, Name})
-    end.
-
-with_resource_params(Args = #{<<"$resource">> := ResId}) ->
-    case emqx_rule_registry:find_resource_params(ResId) of
-        {ok, #resource_params{params = Params}} ->
-            maps:merge(Args, Params);
-        not_found ->
-            throw({resource_not_initialized, ResId})
-    end;
-with_resource_params(Args) -> Args.
-
--dialyzer([{nowarn_function, may_update_rule_params/2}]).
-may_update_rule_params(Rule, Params = #{rawsql := SQL}) ->
-    case emqx_rule_sqlparser:parse_select(SQL) of
+do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
+    case emqx_rule_sqlparser:parse(Sql) of
         {ok, Select} ->
-            may_update_rule_params(
-                Rule#rule{
-                    rawsql = SQL,
-                    for = emqx_rule_sqlparser:select_from(Select),
-                    is_foreach = emqx_rule_sqlparser:select_is_foreach(Select),
-                    fields = emqx_rule_sqlparser:select_fields(Select),
-                    doeach = emqx_rule_sqlparser:select_doeach(Select),
-                    incase = emqx_rule_sqlparser:select_incase(Select),
-                    conditions = emqx_rule_sqlparser:select_where(Select)
-                },
-                maps:remove(rawsql, Params));
-        Reason -> throw(Reason)
-    end;
-may_update_rule_params(Rule = #rule{enabled = OldEnb, actions = Actions, state = OldState},
-         Params = #{enabled := NewEnb}) ->
-    State = case {OldEnb, NewEnb} of
-        {false, true} ->
-            refresh_rule(Rule),
-            force_changed;
-        {true, false} ->
-            clear_actions(Actions),
-            force_changed;
-        _NoChange -> OldState
-    end,
-    may_update_rule_params(Rule#rule{enabled = NewEnb, state = State}, maps:remove(enabled, Params));
-may_update_rule_params(Rule, Params = #{description := Descr}) ->
-    may_update_rule_params(Rule#rule{description = Descr}, maps:remove(description, Params));
-may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) ->
-    may_update_rule_params(Rule#rule{on_action_failed = OnFailed},
-        maps:remove(on_action_failed, Params));
-may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) ->
-    %% prepare new actions before removing old ones
-    NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)),
-    _ =  emqx_plugin_libs_rule:cluster_call(?MODULE, clear_actions, [OldActions]),
-    may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params));
-may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params
-    Rule.
-
-ignore_lib_apps(Apps) ->
-    LibApps = [kernel, stdlib, sasl, appmon, eldap, erts,
-               syntax_tools, ssl, crypto, mnesia, os_mon,
-               inets, goldrush, gproc, runtime_tools,
-               snmp, otp_mibs, public_key, asn1, ssh, hipe,
-               common_test, observer, webtool, xmerl, tools,
-               test_server, compiler, debugger, eunit, et,
-               wx],
-    [AppName || {AppName, _, _} <- Apps, not lists:member(AppName, LibApps)].
-
-resource_id() ->
-    gen_id("resource:", fun emqx_rule_registry:find_resource/1).
-
-rule_id() ->
-    gen_id("rule:", fun emqx_rule_registry:get_rule/1).
-
-gen_id(Prefix, TestFun) ->
-    Id = iolist_to_binary([Prefix, emqx_misc:gen_id()]),
-    case TestFun(Id) of
-        not_found -> Id;
-        _Res -> gen_id(Prefix, TestFun)
-    end.
-
-action_instance_id(ActionName) ->
-    iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]).
-
-init_resource(Module, OnCreate, ResId, Config) ->
-    Params = ?RAISE(Module:OnCreate(ResId, Config),
-        {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
-    ResParams = #resource_params{id = ResId,
-                                 params = Params,
-                                 status = #{is_alive => true}},
-    emqx_rule_registry:add_resource_params(ResParams).
-
-init_action(Module, OnCreate, ActionInstId, Params) ->
-    ok = emqx_rule_metrics:create_metrics(ActionInstId),
-    case ?RAISE(Module:OnCreate(ActionInstId, Params),
-                {{init_action_failure, node()},
-                 {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}}) of
-        {Apply, NewParams} when is_function(Apply) -> %% BACKW: =< e4.2.2
-            ok = emqx_rule_registry:add_action_instance_params(
-                #action_instance_params{id = ActionInstId, params = NewParams, apply = Apply});
-        {Bindings, NewParams} when is_list(Bindings) ->
-            ok = emqx_rule_registry:add_action_instance_params(
-            #action_instance_params{
-                id = ActionInstId, params = NewParams,
-                apply = #{mod => Module, bindings => maps:from_list(Bindings)}});
-        Apply when is_function(Apply) -> %% BACKW: =< e4.2.2
-            ok = emqx_rule_registry:add_action_instance_params(
-                #action_instance_params{id = ActionInstId, params = Params, apply = Apply})
-    end.
-
-clear_resource(_Module, undefined, ResId) ->
-    ok = emqx_rule_registry:remove_resource_params(ResId);
-clear_resource(Module, Destroy, ResId) ->
-    case emqx_rule_registry:find_resource_params(ResId) of
-        {ok, #resource_params{params = Params}} ->
-            ?RAISE(Module:Destroy(ResId, Params),
-                   {{destroy_resource_failure, node()}, {{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}),
-            ok = emqx_rule_registry:remove_resource_params(ResId);
-        not_found ->
-            ok
-    end.
-
-clear_rule(#rule{id = RuleId, actions = Actions}) ->
-    clear_actions(Actions),
-    emqx_rule_metrics:clear_rule_metrics(RuleId),
-    ok.
-
-clear_actions(Actions) ->
-    lists:foreach(
-        fun(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks}) ->
-            {ok, #action{module = Mod, on_destroy = Destory}} = emqx_rule_registry:find_action(ActName),
-            clear_action(Mod, Destory, Id),
-            clear_actions(Fallbacks)
-        end, Actions).
-
-clear_action(_Module, undefined, ActionInstId) ->
-    emqx_rule_metrics:clear_metrics(ActionInstId),
-    ok = emqx_rule_registry:remove_action_instance_params(ActionInstId);
-clear_action(Module, Destroy, ActionInstId) ->
-    case erlang:function_exported(Module, Destroy, 2) of
-        true ->
-            emqx_rule_metrics:clear_metrics(ActionInstId),
-            case emqx_rule_registry:get_action_instance_params(ActionInstId) of
-                {ok, #action_instance_params{params = Params}} ->
-                    ?RAISE(Module:Destroy(ActionInstId, Params),{{destroy_action_failure, node()},
-                                                {{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}),
-                    ok = emqx_rule_registry:remove_action_instance_params(ActionInstId);
-                not_found ->
-                    ok
-            end;
-        false -> ok
-    end.
-
-fetch_resource_status(Module, OnStatus, ResId) ->
-    case emqx_rule_registry:find_resource_params(ResId) of
-        {ok, ResParams = #resource_params{params = Params, status = #{is_alive := LastIsAlive}}} ->
-            NewStatus = try
-                case Module:OnStatus(ResId, Params) of
-                    #{is_alive := LastIsAlive} = Status -> Status;
-                    #{is_alive := true} = Status ->
-                        {ok, Type} = find_type(ResId),
-                        Name = alarm_name_of_resource_down(Type, ResId),
-                        emqx_alarm:deactivate(Name),
-                        Status;
-                    #{is_alive := false} = Status ->
-                        {ok, Type} = find_type(ResId),
-                        Name = alarm_name_of_resource_down(Type, ResId),
-                        emqx_alarm:activate(Name, #{id => ResId, type => Type}),
-                        Status
-                end
-            catch _Error:Reason:STrace ->
-                ?LOG(error, "get resource status for ~p failed: ~0p", [ResId, {Reason, STrace}]),
-                #{is_alive => false}
-            end,
-            emqx_rule_registry:add_resource_params(ResParams#resource_params{status = NewStatus}),
-            NewStatus;
-        not_found ->
-            #{is_alive => false}
+            Rule = #rule{
+                id = RuleId,
+                created_at = erlang:system_time(millisecond),
+                info = #{
+                    enabled => maps:get(enabled, Params, true),
+                    sql => Sql,
+                    from => emqx_rule_sqlparser:select_from(Select),
+                    outputs => Outputs,
+                    description => maps:get(description, Params, ""),
+                    %% -- calculated fields:
+                    is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
+                    fields => emqx_rule_sqlparser:select_fields(Select),
+                    doeach => emqx_rule_sqlparser:select_doeach(Select),
+                    incase => emqx_rule_sqlparser:select_incase(Select),
+                    conditions => emqx_rule_sqlparser:select_where(Select)
+                    %% -- calculated fields end
+                }
+            },
+            ok = emqx_rule_registry:add_rule(Rule),
+            _ = emqx_plugin_libs_rule:cluster_call(emqx_rule_metrics, create_rule_metrics, [RuleId]),
+            {ok, Rule};
+        Reason -> {error, Reason}
     end.
-
-refresh_actions_of_a_resource(ResId) ->
-    R = fun (#action_instance{args = #{<<"$resource">> := ResId0}})
-                when ResId0 =:= ResId -> true;
-            (_) -> false
-        end,
-    F = fun(#rule{actions = Actions}) -> refresh_actions(Actions, R) end,
-    lists:foreach(F, emqx_rule_registry:get_rules()).
-
-refresh_actions(Actions) ->
-    refresh_actions(Actions, fun(_) -> true end).
-refresh_actions(Actions, Pred) ->
-    lists:foreach(
-        fun(#action_instance{args = Args,
-                             id = Id, name = ActName,
-                             fallbacks = Fallbacks} = ActionInst) ->
-            case Pred(ActionInst) of
-                true ->
-                    {ok, #action{module = Mod, on_create = Create}}
-                        = emqx_rule_registry:find_action(ActName),
-                    _ =  emqx_plugin_libs_rule:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]),
-                    refresh_actions(Fallbacks, Pred);
-                false -> ok
-            end
-        end, Actions).
-
-find_type(ResId) ->
-    {ok, #resource{type = Type}} = emqx_rule_registry:find_resource(ResId),
-    {ok, Type}.
-
-alarm_name_of_resource_down(Type, ResId) ->
-    list_to_binary(io_lib:format("resource/~s/~s/down", [Type, ResId])).

+ 249 - 492
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -19,536 +19,293 @@
 -include("rule_engine.hrl").
 -include_lib("emqx/include/logger.hrl").
 
+-behaviour(minirest_api).
 
--rest_api(#{name   => create_rule,
-            method => 'POST',
-            path   => "/rules/",
-            func   => create_rule,
-            descr  => "Create a rule"
-           }).
+-export([api_spec/0]).
 
--rest_api(#{name   => update_rule,
-            method => 'PUT',
-            path   => "/rules/:bin:id",
-            func   => update_rule,
-            descr  => "Update a rule"
-           }).
-
--rest_api(#{name   => list_rules,
-            method => 'GET',
-            path   => "/rules/",
-            func   => list_rules,
-            descr  => "A list of all rules"
-           }).
-
--rest_api(#{name   => show_rule,
-            method => 'GET',
-            path   => "/rules/:bin:id",
-            func   => show_rule,
-            descr  => "Show a rule"
-           }).
-
--rest_api(#{name   => delete_rule,
-            method => 'DELETE',
-            path   => "/rules/:bin:id",
-            func   => delete_rule,
-            descr  => "Delete a rule"
-           }).
-
--rest_api(#{name   => list_actions,
-            method => 'GET',
-            path   => "/actions/",
-            func   => list_actions,
-            descr  => "A list of all actions"
-           }).
-
--rest_api(#{name   => show_action,
-            method => 'GET',
-            path   => "/actions/:atom:name",
-            func   => show_action,
-            descr  => "Show an action"
-           }).
-
--rest_api(#{name   => list_resources,
-            method => 'GET',
-            path   => "/resources/",
-            func   => list_resources,
-            descr  => "A list of all resources"
-           }).
-
--rest_api(#{name   => create_resource,
-            method => 'POST',
-            path   => "/resources/",
-            func   => create_resource,
-            descr  => "Create a resource"
-           }).
-
--rest_api(#{name   => update_resource,
-            method => 'PUT',
-            path   => "/resources/:bin:id",
-            func   => update_resource,
-            descr  => "Update a resource"
-           }).
-
--rest_api(#{name   => show_resource,
-            method => 'GET',
-            path   => "/resources/:bin:id",
-            func   => show_resource,
-            descr  => "Show a resource"
-           }).
-
--rest_api(#{name   => get_resource_status,
-            method => 'GET',
-            path   => "/resource_status/:bin:id",
-            func   => get_resource_status,
-            descr  => "Get status of a resource"
-           }).
-
--rest_api(#{name   => start_resource,
-            method => 'POST',
-            path   => "/resources/:bin:id",
-            func   => start_resource,
-            descr  => "Start a resource"
-           }).
-
--rest_api(#{name   => delete_resource,
-            method => 'DELETE',
-            path   => "/resources/:bin:id",
-            func   => delete_resource,
-            descr  => "Delete a resource"
-           }).
-
--rest_api(#{name   => list_resource_types,
-            method => 'GET',
-            path   => "/resource_types/",
-            func   => list_resource_types,
-            descr  => "List all resource types"
-           }).
-
--rest_api(#{name   => show_resource_type,
-            method => 'GET',
-            path   => "/resource_types/:atom:name",
-            func   => show_resource_type,
-            descr  => "Show a resource type"
-           }).
-
--rest_api(#{name   => list_resources_by_type,
-            method => 'GET',
-            path   => "/resource_types/:atom:type/resources",
-            func   => list_resources_by_type,
-            descr  => "List all resources of a resource type"
-           }).
-
--rest_api(#{name   => list_events,
-            method => 'GET',
-            path   => "/rule_events/",
-            func   => list_events,
-            descr  => "List all events with detailed info"
-           }).
-
--export([ create_rule/2
-        , update_rule/2
-        , list_rules/2
-        , show_rule/2
-        , delete_rule/2
-        ]).
-
--export([ list_actions/2
-        , show_action/2
-        ]).
-
--export([ create_resource/2
-        , list_resources/2
-        , show_resource/2
-        , get_resource_status/2
-        , start_resource/2
-        , delete_resource/2
-        , update_resource/2
+-export([ crud_rules/2
+        , list_events/2
+        , crud_rules_by_id/2
+        , rule_test/2
         ]).
 
--export([ list_resource_types/2
-        , list_resources_by_type/2
-        , show_resource_type/2
-        ]).
-
--export([list_events/2]).
-
 -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~s Not Found", [(ID)]))).
--define(ERR_NO_ACTION(NAME), list_to_binary(io_lib:format("Action ~s Not Found", [(NAME)]))).
--define(ERR_NO_RESOURCE(RESID), list_to_binary(io_lib:format("Resource ~s Not Found", [(RESID)]))).
--define(ERR_NO_RESOURCE_TYPE(TYPE), list_to_binary(io_lib:format("Resource Type ~s Not Found", [(TYPE)]))).
--define(ERR_DEP_RULES_EXISTS(RULEIDS), list_to_binary(io_lib:format("Found rules ~0p depends on this resource, disable them first", [(RULEIDS)]))).
 -define(ERR_BADARGS(REASON),
         begin
-            R0 = list_to_binary(io_lib:format("~0p", [REASON])),
+            R0 = err_msg(REASON),
             <<"Bad Arguments: ", R0/binary>>
         end).
-
--dialyzer({nowarn_function, [create_rule/2,
-                             test_rule_sql/1,
-                             do_create_rule/1,
-                             update_rule/2
-                             ]}).
+-define(CHECK_PARAMS(PARAMS, TAG, EXPR),
+    case emqx_rule_api_schema:check_params(PARAMS, TAG) of
+        {ok, CheckedParams} ->
+            EXPR;
+        {error, REASON} ->
+            {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}}
+    end).
+
+api_spec() ->
+    {
+        [ api_rules_list_create()
+        , api_rules_crud()
+        , api_rule_test()
+        , api_events_list()
+        ],
+        []
+    }.
+
+api_rules_list_create() ->
+    Metadata = #{
+        get => #{
+            description => <<"List all rules">>,
+            responses => #{
+                <<"200">> =>
+                    emqx_mgmt_util:array_schema(resp_schema(), <<"List rules successfully">>)}},
+        post => #{
+            description => <<"Create a new rule using given Id to all nodes in the cluster">>,
+            requestBody => emqx_mgmt_util:schema(post_req_schema(), <<"Rule parameters">>),
+            responses => #{
+                <<"400">> =>
+                    emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
+                <<"201">> =>
+                    emqx_mgmt_util:schema(resp_schema(), <<"Create rule successfully">>)}}
+    },
+    {"/rules", Metadata, crud_rules}.
+
+api_events_list() ->
+    Metadata = #{
+        get => #{
+            description => <<"List all events can be used in rules">>,
+            responses => #{
+                <<"200">> =>
+                    emqx_mgmt_util:array_schema(resp_schema(), <<"List events successfully">>)}}
+    },
+    {"/rule_events", Metadata, list_events}.
+
+api_rules_crud() ->
+    Metadata = #{
+        get => #{
+            description => <<"Get a rule by given Id">>,
+            parameters => [param_path_id()],
+            responses => #{
+                <<"404">> =>
+                    emqx_mgmt_util:error_schema(<<"Rule not found">>, ['NOT_FOUND']),
+                <<"200">> =>
+                    emqx_mgmt_util:schema(resp_schema(), <<"Get rule successfully">>)}},
+        put => #{
+            description => <<"Create or update a rule by given Id to all nodes in the cluster">>,
+            parameters => [param_path_id()],
+            requestBody => emqx_mgmt_util:schema(put_req_schema(), <<"Rule parameters">>),
+            responses => #{
+                <<"400">> =>
+                    emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
+                <<"200">> =>
+                    emqx_mgmt_util:schema(resp_schema(), <<"Create or update rule successfully">>)}},
+        delete => #{
+            description => <<"Delete a rule by given Id from all nodes in the cluster">>,
+            parameters => [param_path_id()],
+            responses => #{
+                <<"200">> =>
+                    emqx_mgmt_util:schema(<<"Delete rule successfully">>)}}
+    },
+    {"/rules/:id", Metadata, crud_rules_by_id}.
+
+api_rule_test() ->
+    Metadata = #{
+        post => #{
+            description => <<"Test a rule">>,
+            requestBody => emqx_mgmt_util:schema(rule_test_req_schema(), <<"Rule parameters">>),
+            responses => #{
+                <<"400">> =>
+                    emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
+                <<"412">> =>
+                    emqx_mgmt_util:error_schema(<<"SQL Not Match">>, ['NOT_MATCH']),
+                <<"200">> =>
+                    emqx_mgmt_util:schema(rule_test_resp_schema(), <<"Rule Test Pass">>)}}
+    },
+    {"/rule_test", Metadata, rule_test}.
+
+put_req_schema() ->
+    #{type => object,
+      properties => #{
+        sql => #{
+            description => <<"The SQL">>,
+            type => string,
+            example => <<"SELECT * from \"t/1\"">>
+        },
+        enable => #{
+            description => <<"Enable or disable the rule">>,
+            type => boolean,
+            example => true
+        },
+        outputs => #{
+            description => <<"The outputs of the rule">>,
+            type => array,
+            items => #{
+                type => string,
+                example => <<"console">>
+            }
+        },
+        description => #{
+            description => <<"The description for the rule">>,
+            type => string,
+            example => <<"A simple rule that handles MQTT messages from topic \"t/1\"">>
+        }
+      }
+    }.
+
+post_req_schema() ->
+    Req = #{properties := Prop} = put_req_schema(),
+    Req#{properties => Prop#{
+        id => #{
+            description => <<"The Id for the rule">>,
+            example => <<"my_rule">>,
+            type => string
+        }
+    }}.
+
+resp_schema() ->
+    Req = #{properties := Prop} = put_req_schema(),
+    Req#{properties => Prop#{
+        id => #{
+            description => <<"The Id for the rule">>,
+            type => string
+        },
+        created_at => #{
+            description => <<"The time that this rule was created, in rfc3339 format">>,
+            type => string,
+            example => <<"2021-09-18T13:57:29+08:00">>
+        }
+    }}.
+
+rule_test_req_schema() ->
+    #{type => object, properties => #{
+        sql => #{
+            description => <<"The SQL">>,
+            type => string,
+            example => <<"SELECT * from \"t/1\"">>
+        },
+        context => #{
+            type => object,
+            properties => #{
+                event_type => #{
+                    description => <<"Event Type">>,
+                    type => string,
+                    enum => ["message_publish", "message_acked", "message_delivered",
+                        "message_dropped", "session_subscribed", "session_unsubscribed",
+                        "client_connected", "client_disconnected"],
+                    example => <<"message_publish">>
+                },
+                clientid => #{
+                    description => <<"The Client ID">>,
+                    type => string,
+                    example => <<"\"c_emqx\"">>
+                },
+                topic => #{
+                    description => <<"The Topic">>,
+                    type => string,
+                    example => <<"t/1">>
+                }
+            }
+        }
+    }}.
+
+rule_test_resp_schema() ->
+    #{type => object}.
+
+param_path_id() ->
+    #{
+        name => id,
+        in => path,
+        schema => #{type => string},
+        required => true
+    }.
 
 %%------------------------------------------------------------------------------
 %% Rules API
 %%------------------------------------------------------------------------------
-create_rule(_Bindings, Params) ->
-    if_test(fun() -> test_rule_sql(Params) end,
-            fun() -> do_create_rule(Params) end,
-            Params).
-
-test_rule_sql(Params) ->
-    case emqx_rule_sqltester:test(emqx_json:decode(emqx_json:encode(Params), [return_maps])) of
-        {ok, Result} -> return({ok, Result});
-        {error, nomatch} -> return({error, 404, <<"SQL Not Match">>});
-        {error, Reason} ->
-            ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
-            return({error, 400, ?ERR_BADARGS(Reason)})
-    end.
-
-do_create_rule(Params) ->
-    case emqx_rule_engine:create_rule(parse_rule_params(Params)) of
-        {ok, Rule} -> return({ok, record_to_map(Rule)});
-        {error, {action_not_found, ActionName}} ->
-            return({error, 400, ?ERR_NO_ACTION(ActionName)});
-        {error, Reason} ->
-            ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
-            return({error, 400, ?ERR_BADARGS(Reason)})
-    end.
-
-update_rule(#{id := Id}, Params) ->
-    case emqx_rule_engine:update_rule(parse_rule_params(Params, #{id => Id})) of
-        {ok, Rule} -> return({ok, record_to_map(Rule)});
-        {error, {not_found, RuleId}} ->
-            return({error, 400, ?ERR_NO_RULE(RuleId)});
-        {error, Reason} ->
-            ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
-            return({error, 400, ?ERR_BADARGS(Reason)})
-    end.
-
-list_rules(_Bindings, _Params) ->
-    return_all(emqx_rule_registry:get_rules_ordered_by_ts()).
-
-show_rule(#{id := Id}, _Params) ->
-    reply_with(fun emqx_rule_registry:get_rule/1, Id).
-
-delete_rule(#{id := Id}, _Params) ->
-    ok = emqx_rule_engine:delete_rule(Id),
-    return(ok).
-
-%%------------------------------------------------------------------------------
-%% Actions API
-%%------------------------------------------------------------------------------
 
-list_actions(#{}, _Params) ->
-    return_all(
-        sort_by_title(action,
-            emqx_rule_registry:get_actions())).
+list_events(#{}, _Params) ->
+    {200, emqx_rule_events:event_info()}.
 
-show_action(#{name := Name}, _Params) ->
-    reply_with(fun emqx_rule_registry:find_action/1, Name).
+crud_rules(get, _Params) ->
+    Records = emqx_rule_registry:get_rules_ordered_by_ts(),
+    {200, format_rule_resp(Records)};
 
-%%------------------------------------------------------------------------------
-%% Resources API
-%%------------------------------------------------------------------------------
-create_resource(#{}, Params) ->
-    case parse_resource_params(Params) of
-        {ok, ParsedParams} ->
-            if_test(fun() -> do_create_resource(test_resource, ParsedParams) end,
-                    fun() -> do_create_resource(create_resource, ParsedParams) end,
-                    Params);
+crud_rules(post, #{body := Params}) ->
+    ?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:create_rule(CheckedParams) of
+        {ok, Rule} -> {201, format_rule_resp(Rule)};
         {error, Reason} ->
-            ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
-            return({error, 400, ?ERR_BADARGS(Reason)})
-    end.
-
-do_create_resource(Create, ParsedParams) ->
-    case emqx_rule_engine:Create(ParsedParams) of
-        ok ->
-            return(ok);
-        {ok, Resource} ->
-            return({ok, record_to_map(Resource)});
-        {error, {resource_type_not_found, Type}} ->
-            return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)});
-        {error, {init_resource, _}} ->
-            return({error, 500, <<"Init resource failure!">>});
+            ?LOG(error, "create rule failed: ~0p", [Reason]),
+            {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
+    end).
+
+rule_test(post, #{body := Params}) ->
+    ?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of
+        {ok, Result} -> {200, Result};
+        {error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}};
         {error, Reason} ->
-            ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
-            return({error, 400, ?ERR_BADARGS(Reason)})
-    end.
-
-list_resources(#{}, _Params) ->
-    Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
-    Data = lists:map(fun(Res = #{id := Id}) ->
-               Status = lists:all(fun(Node) ->
-                            case rpc:call(Node, emqx_rule_registry, find_resource_params, [Id]) of
-                                {ok, #resource_params{status = #{is_alive := true}}} -> true;
-                                _ -> false
-                            end
-                        end, ekka_mnesia:running_nodes()),
-               maps:put(status, Status, Res)
-           end, Data0),
-    return({ok, Data}).
-
-list_resources_by_type(#{type := Type}, _Params) ->
-    return_all(emqx_rule_registry:get_resources_by_type(Type)).
-
-show_resource(#{id := Id}, _Params) ->
-    case emqx_rule_registry:find_resource(Id) of
-        {ok, R} ->
-            Status =
-                [begin
-                    {ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]),
-                    maps:put(node, Node, St)
-                end || Node <- ekka_mnesia:running_nodes()],
-            return({ok, maps:put(status, Status, record_to_map(R))});
+            ?LOG(error, "rule test failed: ~0p", [Reason]),
+            {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
+    end).
+
+crud_rules_by_id(get, #{bindings := #{id := Id}}) ->
+    case emqx_rule_registry:get_rule(Id) of
+        {ok, Rule} ->
+            {200, format_rule_resp(Rule)};
         not_found ->
-            return({error, 404, <<"Not Found">>})
-    end.
-
-get_resource_status(#{id := Id}, _Params) ->
-    case emqx_rule_engine:get_resource_status(Id) of
-        {ok, Status} ->
-            return({ok, Status});
-        {error, {resource_not_found, ResId}} ->
-            return({error, 400, ?ERR_NO_RESOURCE(ResId)})
-    end.
-
-start_resource(#{id := Id}, _Params) ->
-    case emqx_rule_engine:start_resource(Id) of
-        ok ->
-            return(ok);
-        {error, {resource_not_found, ResId}} ->
-            return({error, 400, ?ERR_NO_RESOURCE(ResId)});
-        {error, Reason} ->
-            ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
-            return({error, 400, ?ERR_BADARGS(Reason)})
-    end.
+            {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
+    end;
 
-update_resource(#{id := Id}, NewParams) ->
-    P1 = case proplists:get_value(<<"description">>, NewParams) of
-        undefined -> #{};
-        Value -> #{<<"description">> => Value}
-    end,
-    P2 = case proplists:get_value(<<"config">>, NewParams) of
-        undefined -> #{};
-        [{}] -> #{};
-        Config -> #{<<"config">> => ?RAISE(json_term_to_map(Config), {invalid_config, Config})}
-    end,
-    case emqx_rule_engine:update_resource(Id, maps:merge(P1, P2)) of
-        ok ->
-            return(ok);
+crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params0}) ->
+    Params = maps:merge(Params0, #{id => Id}),
+    ?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:update_rule(CheckedParams) of
+        {ok, Rule} -> {200, format_rule_resp(Rule)};
         {error, not_found} ->
-            return({error, 400, <<"Resource not found:", Id/binary>>});
-        {error, {init_resource, _}} ->
-            return({error, 500, <<"Init resource failure:", Id/binary>>});
-        {error, {dependent_rules_exists, RuleIds}} ->
-            return({error, 400, ?ERR_DEP_RULES_EXISTS(RuleIds)});
+            {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}};
         {error, Reason} ->
-            ?LOG(error, "Resource update failed: ~0p", [Reason]),
-            return({error, 400, ?ERR_BADARGS(Reason)})
-    end.
-
-delete_resource(#{id := Id}, _Params) ->
-    case emqx_rule_engine:delete_resource(Id) of
-        ok -> return(ok);
-        {error, not_found} -> return(ok);
-        {error, {dependent_rules_exists, RuleIds}} ->
-            return({error, 400, ?ERR_DEP_RULES_EXISTS(RuleIds)});
+            ?LOG(error, "update rule failed: ~0p", [Reason]),
+            {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
+    end);
+
+crud_rules_by_id(delete, #{bindings := #{id := Id}}) ->
+    case emqx_rule_engine:delete_rule(Id) of
+        ok -> {200};
+        {error, not_found} -> {200};
         {error, Reason} ->
-            return({error, 400, ?ERR_BADARGS(Reason)})
+            ?LOG(error, "delete rule failed: ~0p", [Reason]),
+            {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}}
     end.
 
-%%------------------------------------------------------------------------------
-%% Resource Types API
-%%------------------------------------------------------------------------------
-
-list_resource_types(#{}, _Params) ->
-    return_all(
-        sort_by_title(resource_type,
-            emqx_rule_registry:get_resource_types())).
-
-show_resource_type(#{name := Name}, _Params) ->
-    reply_with(fun emqx_rule_registry:find_resource_type/1, Name).
-
-
-%%------------------------------------------------------------------------------
-%% Events API
-%%------------------------------------------------------------------------------
-
-list_events(#{}, _Params) ->
-    return({ok, emqx_rule_events:event_info()}).
-
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
+err_msg(Msg) ->
+    list_to_binary(io_lib:format("~0p", [Msg])).
 
-if_test(True, False, Params) ->
-    case proplists:get_value(<<"test">>, Params) of
-        Test when Test =:= true; Test =:= <<"true">> ->
-            True();
-        _ ->
-            False()
-    end.
-
-return_all(Records) ->
-    Data = lists:foldr(fun maybe_record_to_map/2, [], Records),
-    return({ok, Data}).
 
-maybe_record_to_map(Rec, Acc) ->
-    case record_to_map(Rec) of
-        ignore -> Acc;
-        Map -> [Map | Acc]
-    end.
+format_rule_resp(Rules) when is_list(Rules) ->
+    [format_rule_resp(R) || R <- Rules];
 
-reply_with(Find, Key) ->
-    case Find(Key) of
-        {ok, R} ->
-            return({ok, record_to_map(R)});
-        not_found ->
-            return({error, 404, <<"Not Found">>})
-    end.
-
-record_to_map(#rule{id = Id,
-                    for = Hook,
-                    rawsql = RawSQL,
-                    actions = Actions,
-                    on_action_failed = OnFailed,
-                    enabled = Enabled,
-                    description = Descr}) ->
+format_rule_resp(#rule{id = Id, created_at = CreatedAt,
+    info = #{
+        from := Topics,
+        outputs := Output,
+        sql := SQL,
+        enabled := Enabled,
+        description := Descr}}) ->
     #{id => Id,
-      for => Hook,
-      rawsql => RawSQL,
-      actions => printable_actions(Actions),
-      on_action_failed => OnFailed,
+      from => Topics,
+      outputs => Output,
+      sql => SQL,
       metrics => get_rule_metrics(Id),
       enabled => Enabled,
-      description => Descr
-     };
-
-record_to_map(#action{hidden = true}) ->
-    ignore;
-record_to_map(#action{name = Name,
-                      category = Category,
-                      app = App,
-                      for = Hook,
-                      types = Types,
-                      params_spec = Params,
-                      title = Title,
-                      description = Descr}) ->
-    #{name => Name,
-      category => Category,
-      app => App,
-      for => Hook,
-      types => Types,
-      params => Params,
-      title => Title,
-      description => Descr
-     };
-
-record_to_map(#resource{id = Id,
-                        type = Type,
-                        config = Config,
-                        description = Descr}) ->
-    #{id => Id,
-      type => Type,
-      config => Config,
-      description => Descr
-     };
-
-record_to_map(#resource_type{name = Name,
-                             provider = Provider,
-                             params_spec = Params,
-                             title = Title,
-                             description = Descr}) ->
-    #{name => Name,
-      provider => Provider,
-      params => Params,
-      title => Title,
+      created_at => format_datetime(CreatedAt, millisecond),
       description => Descr
      }.
 
-printable_actions(Actions) ->
-    [#{id => Id, name => Name, params => Args,
-       metrics => get_action_metrics(Id),
-       fallbacks => printable_actions(Fallbacks)}
-     || #action_instance{id = Id, name = Name, args = Args, fallbacks = Fallbacks} <- Actions].
-
-parse_rule_params(Params) ->
-    parse_rule_params(Params, #{description => <<"">>}).
-parse_rule_params([], Rule) ->
-    Rule;
-parse_rule_params([{<<"id">>, Id} | Params], Rule) ->
-    parse_rule_params(Params, Rule#{id => Id});
-parse_rule_params([{<<"rawsql">>, RawSQL} | Params], Rule) ->
-    parse_rule_params(Params, Rule#{rawsql => RawSQL});
-parse_rule_params([{<<"enabled">>, Enabled} | Params], Rule) ->
-    parse_rule_params(Params, Rule#{enabled => enabled(Enabled)});
-parse_rule_params([{<<"on_action_failed">>, OnFailed} | Params], Rule) ->
-    parse_rule_params(Params, Rule#{on_action_failed => on_failed(OnFailed)});
-parse_rule_params([{<<"actions">>, Actions} | Params], Rule) ->
-    parse_rule_params(Params, Rule#{actions => parse_actions(Actions)});
-parse_rule_params([{<<"description">>, Descr} | Params], Rule) ->
-    parse_rule_params(Params, Rule#{description => Descr});
-parse_rule_params([_ | Params], Rule) ->
-    parse_rule_params(Params, Rule).
-
-on_failed(<<"continue">>) -> continue;
-on_failed(<<"stop">>) -> stop;
-on_failed(OnFailed) -> error({invalid_on_failed, OnFailed}).
-
-enabled(Enabled) when is_boolean(Enabled) -> Enabled;
-enabled(Enabled) -> error({invalid_enabled, Enabled}).
-
-parse_actions(Actions) ->
-    [parse_action(json_term_to_map(A)) || A <- Actions].
-
-parse_action(Action) ->
-    #{name => binary_to_existing_atom(maps:get(<<"name">>, Action), utf8),
-      args => maps:get(<<"params">>, Action, #{}),
-      fallbacks => parse_actions(maps:get(<<"fallbacks">>, Action, []))}.
-
-parse_resource_params(Params) ->
-    parse_resource_params(Params, #{config => #{}, description => <<"">>}).
-parse_resource_params([], Res) ->
-    {ok, Res};
-parse_resource_params([{<<"id">>, Id} | Params], Res) ->
-    parse_resource_params(Params, Res#{id => Id});
-parse_resource_params([{<<"type">>, ResourceType} | Params], Res) ->
-    try parse_resource_params(Params, Res#{type => binary_to_existing_atom(ResourceType, utf8)})
-    catch error:badarg ->
-        {error, {resource_type_not_found, ResourceType}}
-    end;
-parse_resource_params([{<<"config">>, Config} | Params], Res) ->
-    parse_resource_params(Params, Res#{config => json_term_to_map(Config)});
-parse_resource_params([{<<"description">>, Descr} | Params], Res) ->
-    parse_resource_params(Params, Res#{description => Descr});
-parse_resource_params([_ | Params], Res) ->
-    parse_resource_params(Params, Res).
-
-json_term_to_map(List) ->
-    emqx_json:decode(emqx_json:encode(List), [return_maps]).
-
-sort_by_title(action, Actions) ->
-    sort_by(#action.title, Actions);
-sort_by_title(resource_type, ResourceTypes) ->
-    sort_by(#resource_type.title, ResourceTypes).
-
-sort_by(Pos, TplList) ->
-    lists:sort(
-        fun(RecA, RecB) ->
-            maps:get(en, element(Pos, RecA), 0)
-            =< maps:get(en, element(Pos, RecB), 0)
-        end, TplList).
+format_datetime(Timestamp, Unit) ->
+    list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).
 
 get_rule_metrics(Id) ->
     [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
      || Node <- ekka_mnesia:running_nodes()].
-
-get_action_metrics(Id) ->
-    [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]))
-     || Node <- ekka_mnesia:running_nodes()].
-
-%%    TODO: V5 API
-return(_) -> ok.

+ 1 - 7
apps/emqx_rule_engine/src/emqx_rule_engine_app.erl

@@ -23,13 +23,7 @@
 -export([stop/1]).
 
 start(_Type, _Args) ->
-    {ok, Sup} = emqx_rule_engine_sup:start_link(),
-    _ = emqx_rule_engine_sup:start_locker(),
-    ok = emqx_rule_engine:load_providers(),
-    ok = emqx_rule_engine:refresh_resources(),
-    ok = emqx_rule_engine:refresh_rules(),
-    ok = emqx_rule_engine_cli:load(),
-    {ok, Sup}.
+    emqx_rule_engine_sup:start_link().
 
 stop(_State) ->
     ok = emqx_rule_events:unload(),

+ 1 - 21
apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl

@@ -22,17 +22,12 @@
 
 -export([start_link/0]).
 
--export([start_locker/0]).
-
 -export([init/1]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    Opts = [public, named_table, set, {read_concurrency, true}],
-    _ = ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]),
-    _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
     Registry = #{id => emqx_rule_registry,
                  start => {emqx_rule_registry, start_link, []},
                  restart => permanent,
@@ -45,19 +40,4 @@ init([]) ->
                 shutdown => 5000,
                 type => worker,
                 modules => [emqx_rule_metrics]},
-    Monitor = #{id => emqx_rule_monitor,
-                start => {emqx_rule_monitor, start_link, []},
-                restart => permanent,
-                shutdown => 5000,
-                type => worker,
-                modules => [emqx_rule_monitor]},
-    {ok, {{one_for_one, 10, 10}, [Registry, Metrics, Monitor]}}.
-
-start_locker() ->
-    Locker = #{id => emqx_rule_locker,
-               start => {emqx_rule_locker, start_link, []},
-               restart => permanent,
-               shutdown => 5000,
-               type => worker,
-               modules => [emqx_rule_locker]},
-    supervisor:start_child(?MODULE, Locker).
+    {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.

+ 3 - 4
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -79,10 +79,9 @@ unload(Topic) ->
 %%--------------------------------------------------------------------
 on_message_publish(Message = #message{topic = Topic}, _Env) ->
     case ignore_sys_message(Message) of
-        true ->
-            ok;
+        true -> ok;
         false ->
-            case emqx_rule_registry:get_rules_for(Topic) of
+            case emqx_rule_registry:get_rules_for_topic(Topic) of
                 [] -> ok;
                 Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
             end
@@ -297,7 +296,7 @@ with_basic_columns(EventName, Data) when is_map(Data) ->
 %%--------------------------------------------------------------------
 apply_event(EventName, GenEventMsg, _Env) ->
     EventTopic = event_topic(EventName),
-    case emqx_rule_registry:get_rules_for(EventTopic) of
+    case emqx_rule_registry:get_rules_for_topic(EventTopic) of
         [] -> ok;
         Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
     end.

+ 20 - 0
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -17,6 +17,8 @@
 -module(emqx_rule_funcs).
 
 -include("rule_engine.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 %% IoT Funcs
 -export([ msgid/0
@@ -36,6 +38,8 @@
         , contains_topic_match/2
         , contains_topic_match/3
         , null/0
+        , republish/3
+        , republish/4
         ]).
 
 %% Arithmetic Funcs
@@ -305,6 +309,22 @@ find_topic_filter(Filter, TopicFilters, Func) ->
 null() ->
     undefined.
 
+republish(Topic, Payload, Qos) ->
+    republish(Topic, Payload, Qos, false).
+
+republish(Topic, Payload, Qos, Retain) ->
+    Msg = #message{
+            id = emqx_guid:gen(),
+            qos = Qos,
+            from = republish_function,
+            flags = #{retain => Retain},
+            headers = #{},
+            topic = Topic,
+            payload = Payload,
+            timestamp = erlang:system_time(millisecond)
+        },
+    emqx_broker:safe_publish(Msg).
+
 %%------------------------------------------------------------------------------
 %% Arithmetic Funcs
 %%------------------------------------------------------------------------------

+ 25 - 151
apps/emqx_rule_engine/src/emqx_rule_metrics.erl

@@ -26,42 +26,21 @@
         ]).
 
 -export([ get_rules_matched/1
-        , get_actions_taken/1
-        , get_actions_success/1
-        , get_actions_error/1
-        , get_actions_exception/1
-        , get_actions_retry/1
         ]).
 
 -export([ inc_rules_matched/1
         , inc_rules_matched/2
-        , inc_actions_taken/1
-        , inc_actions_taken/2
-        , inc_actions_success/1
-        , inc_actions_success/2
-        , inc_actions_error/1
-        , inc_actions_error/2
-        , inc_actions_exception/1
-        , inc_actions_exception/2
-        , inc_actions_retry/1
-        , inc_actions_retry/2
         ]).
 
 -export([ inc/2
         , inc/3
         , get/2
-        , get_overall/1
         , get_rule_speed/1
-        , get_overall_rule_speed/0
         , create_rule_metrics/1
-        , create_metrics/1
         , clear_rule_metrics/1
-        , clear_metrics/1
-        , overall_metrics/0
         ]).
 
 -export([ get_rule_metrics/1
-        , get_action_metrics/1
         ]).
 
 %% gen_server callbacks
@@ -82,7 +61,7 @@
 -define(SAMPLING, 1).
 -endif.
 
--define(CRefID(ID), {?MODULE, ID}).
+-define(CntrRef, ?MODULE).
 -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
 
 -record(rule_speed, {
@@ -99,48 +78,32 @@
 
 -record(state, {
             metric_ids = sets:new(),
-            rule_speeds :: undefined | #{rule_id() => #rule_speed{}},
-            overall_rule_speed :: #rule_speed{}
+            rule_speeds :: undefined | #{rule_id() => #rule_speed{}}
         }).
 
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
+
 -spec(create_rule_metrics(rule_id()) -> ok).
 create_rule_metrics(Id) ->
     gen_server:call(?MODULE, {create_rule_metrics, Id}).
 
--spec(create_metrics(rule_id()) -> ok).
-create_metrics(Id) ->
-    gen_server:call(?MODULE, {create_metrics, Id}).
-
 -spec(clear_rule_metrics(rule_id()) -> ok).
 clear_rule_metrics(Id) ->
     gen_server:call(?MODULE, {delete_rule_metrics, Id}).
 
--spec(clear_metrics(rule_id()) -> ok).
-clear_metrics(Id) ->
-    gen_server:call(?MODULE, {delete_metrics, Id}).
-
 -spec(get(rule_id(), atom()) -> number()).
 get(Id, Metric) ->
-    case couters_ref(Id) of
+    case get_couters_ref(Id) of
         not_found -> 0;
         Ref -> counters:get(Ref, metrics_idx(Metric))
     end.
 
--spec(get_overall(atom()) -> number()).
-get_overall(Metric) ->
-    emqx_metrics:val(Metric).
-
 -spec(get_rule_speed(rule_id()) -> map()).
 get_rule_speed(Id) ->
     gen_server:call(?MODULE, {get_rule_speed, Id}).
 
--spec(get_overall_rule_speed() -> map()).
-get_overall_rule_speed() ->
-    gen_server:call(?MODULE, get_overall_rule_speed).
-
 -spec(get_rule_metrics(rule_id()) -> map()).
 get_rule_metrics(Id) ->
     #{max := Max, current := Current, last5m := Last5M} = get_rule_speed(Id),
@@ -150,95 +113,39 @@ get_rule_metrics(Id) ->
       speed_last5m => Last5M
     }.
 
--spec(get_action_metrics(action_instance_id()) -> map()).
-get_action_metrics(Id) ->
-    #{success => get_actions_success(Id),
-      failed => get_actions_error(Id) + get_actions_exception(Id),
-      taken => get_actions_taken(Id)
-     }.
-
 -spec inc(rule_id(), atom()) -> ok.
 inc(Id, Metric) ->
     inc(Id, Metric, 1).
 
 -spec inc(rule_id(), atom(), pos_integer()) -> ok.
 inc(Id, Metric, Val) ->
-    case couters_ref(Id) of
+    case get_couters_ref(Id) of
         not_found ->
             %% this may occur when increasing a counter for
             %% a rule that was created from a remove node.
-            case atom_to_list(Metric) of
-                "rules." ++ _ -> create_rule_metrics(Id);
-                _ -> create_metrics(Id)
-            end,
-            counters:add(couters_ref(Id), metrics_idx(Metric), Val);
+            create_rule_metrics(Id),
+            counters:add(get_couters_ref(Id), metrics_idx(Metric), Val);
         Ref ->
             counters:add(Ref, metrics_idx(Metric), Val)
-    end,
-    inc_overall(Metric, Val).
-
--spec(inc_overall(atom(), pos_integer()) -> ok).
-inc_overall(Metric, Val) ->
-    emqx_metrics:inc(Metric, Val).
+    end.
 
 inc_rules_matched(Id) ->
     inc_rules_matched(Id, 1).
 inc_rules_matched(Id, Val) ->
     inc(Id, 'rules.matched', Val).
 
-inc_actions_taken(Id) ->
-    inc_actions_taken(Id, 1).
-inc_actions_taken(Id, Val) ->
-    inc(Id, 'actions.taken', Val).
-
-inc_actions_success(Id) ->
-    inc_actions_success(Id, 1).
-inc_actions_success(Id, Val) ->
-    inc(Id, 'actions.success', Val).
-
-inc_actions_error(Id) ->
-    inc_actions_error(Id, 1).
-inc_actions_error(Id, Val) ->
-    inc(Id, 'actions.error', Val).
-
-inc_actions_exception(Id) ->
-    inc_actions_exception(Id, 1).
-inc_actions_exception(Id, Val) ->
-    inc(Id, 'actions.exception', Val).
-
-inc_actions_retry(Id) ->
-    inc_actions_retry(Id, 1).
-inc_actions_retry(Id, Val) ->
-    inc(Id, 'actions.retry', Val).
-
 get_rules_matched(Id) ->
     get(Id, 'rules.matched').
 
-get_actions_taken(Id) ->
-    get(Id, 'actions.taken').
-
-get_actions_success(Id) ->
-    get(Id, 'actions.success').
-
-get_actions_error(Id) ->
-    get(Id, 'actions.error').
-
-get_actions_exception(Id) ->
-    get(Id, 'actions.exception').
-
-get_actions_retry(Id) ->
-    get(Id, 'actions.retry').
-
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 init([]) ->
     erlang:process_flag(trap_exit, true),
-    %% the overall counters
-    [ok = emqx_metrics:ensure(Metric)|| Metric <- overall_metrics()],
     %% the speed metrics
     erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
-    {ok, #state{overall_rule_speed = #rule_speed{}}}.
+    persistent_term:put(?CntrRef, #{}),
+    {ok, #state{}}.
 
 handle_call({get_rule_speed, _Id}, _From, State = #state{rule_speeds = undefined}) ->
     {reply, format_rule_speed(#rule_speed{}), State};
@@ -248,12 +155,6 @@ handle_call({get_rule_speed, Id}, _From, State = #state{rule_speeds = RuleSpeeds
                 Speed -> format_rule_speed(Speed)
             end, State};
 
-handle_call(get_overall_rule_speed, _From, State = #state{overall_rule_speed = RuleSpeed}) ->
-    {reply, format_rule_speed(RuleSpeed), State};
-
-handle_call({create_metrics, Id}, _From, State = #state{metric_ids = MIDs}) ->
-    {reply, create_counters(Id), State#state{metric_ids = sets:add_element(Id, MIDs)}};
-
 handle_call({create_rule_metrics, Id}, _From,
             State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) ->
     {reply, create_counters(Id),
@@ -263,10 +164,6 @@ handle_call({create_rule_metrics, Id}, _From,
                                     _ -> RuleSpeeds#{Id => #rule_speed{}}
                                 end}};
 
-handle_call({delete_metrics, Id}, _From,
-            State = #state{metric_ids = MIDs, rule_speeds = undefined}) ->
-    {reply, delete_counters(Id), State#state{metric_ids = sets:del_element(Id, MIDs)}};
-
 handle_call({delete_rule_metrics, Id}, _From,
             State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) ->
     {reply, delete_counters(Id),
@@ -283,21 +180,16 @@ handle_cast(_Msg, State) ->
     {noreply, State}.
 
 handle_info(ticking, State = #state{rule_speeds = undefined}) ->
-    async_refresh_resource_status(),
     erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
     {noreply, State};
 
-handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0,
-                                       overall_rule_speed = OverallRuleSpeed0}) ->
+handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0}) ->
     RuleSpeeds = maps:map(
                     fun(Id, RuleSpeed) ->
                         calculate_speed(get_rules_matched(Id), RuleSpeed)
                     end, RuleSpeeds0),
-    OverallRuleSpeed = calculate_speed(get_overall('rules.matched'), OverallRuleSpeed0),
-    async_refresh_resource_status(),
     erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
-    {noreply, State#state{rule_speeds = RuleSpeeds,
-                          overall_rule_speed = OverallRuleSpeed}};
+    {noreply, State#state{rule_speeds = RuleSpeeds}};
 
 handle_info(_Info, State) ->
     {noreply, State}.
@@ -307,7 +199,7 @@ code_change(_OldVsn, State, _Extra) ->
 
 terminate(_Reason, #state{metric_ids = MIDs}) ->
     [delete_counters(Id) || Id <- sets:to_list(MIDs)],
-    persistent_term:erase(?MODULE).
+    persistent_term:erase(?CntrRef).
 
 stop() ->
     gen_server:stop(?MODULE).
@@ -316,26 +208,22 @@ stop() ->
 %% Internal Functions
 %%------------------------------------------------------------------------------
 
-async_refresh_resource_status() ->
-    spawn(emqx_rule_engine, refresh_resource_status, []).
-
 create_counters(Id) ->
-    case couters_ref(Id) of
+    case get_couters_ref(Id) of
         not_found ->
-            ok = persistent_term:put(?CRefID(Id),
-                    counters:new(max_counters_size(), [write_concurrency]));
+            CntrRef = counters:new(max_counters_size(), [write_concurrency]),
+            persistent_term:put(?CntrRef, #{Id => CntrRef});
         _Ref -> ok
     end.
 
 delete_counters(Id) ->
-    persistent_term:erase(?CRefID(Id)),
-    ok.
+    persistent_term:put(?CntrRef, maps:remove(Id, get_all_counters())).
 
-couters_ref(Id) ->
-    try persistent_term:get(?CRefID(Id))
-    catch
-        error:badarg -> not_found
-    end.
+get_couters_ref(Id) ->
+    maps:get(Id, get_all_counters(), not_found).
+
+get_all_counters() ->
+    persistent_term:get(?CntrRef, #{}).
 
 calculate_speed(_CurrVal, undefined) ->
     undefined;
@@ -379,21 +267,7 @@ precision(Float, N) ->
 %% Metrics Definitions
 %%------------------------------------------------------------------------------
 
-max_counters_size() -> 7.
-
+max_counters_size() -> 2.
 metrics_idx('rules.matched') ->       1;
-metrics_idx('actions.success') ->     2;
-metrics_idx('actions.error') ->       3;
-metrics_idx('actions.taken') ->       4;
-metrics_idx('actions.exception') ->   5;
-metrics_idx('actions.retry') ->       6;
-metrics_idx(_) ->                     7.
-
-overall_metrics() ->
-    [ 'rules.matched'
-    , 'actions.success'
-    , 'actions.error'
-    , 'actions.taken'
-    , 'actions.exception'
-    , 'actions.retry'
-    ].
+metrics_idx(_) ->                     2.
+

+ 0 - 126
apps/emqx_rule_engine/src/emqx_rule_monitor.erl

@@ -1,126 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_rule_monitor).
-
--behavior(gen_server).
-
--include("rule_engine.hrl").
--include_lib("emqx/include/logger.hrl").
-
--export([init/1,
-         handle_call/3,
-         handle_cast/2,
-         handle_info/2,
-         terminate/2,
-         code_change/3]).
-
--export([ start_link/0
-        , stop/0
-        , ensure_resource_retrier/2
-        , retry_loop/3
-        ]).
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-stop() ->
-    gen_server:stop(?MODULE).
-
-init([]) ->
-    _ = erlang:process_flag(trap_exit, true),
-    {ok, #{retryers => #{}}}.
-
-ensure_resource_retrier(ResId, Interval) ->
-    gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).
-
-handle_call(_Msg, _From, State) ->
-    {reply, ok, State}.
-
-handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
-    Objects = maps:get(Tag, State, #{}),
-    NewState = case maps:find(Obj, Objects) of
-        error ->
-            update_object(Tag, Obj,
-                create_restart_handler(Tag, Obj, Interval), State);
-        {ok, _Pid} ->
-            State
-    end,
-    {noreply, NewState};
-
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) ->
-    case maps:take(Pid, Retryers) of
-        {{Tag, Obj}, Retryers2} ->
-            Objects = maps:get(Tag, State, #{}),
-            {noreply, State#{Tag => maps:remove(Obj, Objects),
-                             retryers => Retryers2}};
-        error ->
-            ?LOG(error, "got unexpected proc down: ~p ~p", [Pid, Reason]),
-            {noreply, State}
-    end;
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-update_object(Tag, Obj, Retryer, State) ->
-    Objects = maps:get(Tag, State, #{}),
-    Retryers = maps:get(retryers, State, #{}),
-    State#{
-        Tag => Objects#{Obj => Retryer},
-        retryers => Retryers#{Retryer => {Tag, Obj}}
-    }.
-
-create_restart_handler(Tag, Obj, Interval) ->
-    ?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]),
-    %% spawn a dedicated process to handle the restarting asynchronously
-    spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]).
-
-retry_loop(resource, ResId, Interval) ->
-    case emqx_rule_registry:find_resource(ResId) of
-        {ok, #resource{type = Type, config = Config}} ->
-            try
-                {ok, #resource_type{on_create = {M, F}}} =
-                    emqx_rule_registry:find_resource_type(Type),
-                ok = emqx_rule_engine:init_resource(M, F, ResId, Config),
-                refresh_and_enable_rules_of_resource(ResId)
-            catch
-                Err:Reason:ST ->
-                    ?LOG(warning, "init_resource failed: ~p, ~0p",
-                        [{Err, Reason}, ST]),
-                    timer:sleep(Interval),
-                    ?MODULE:retry_loop(resource, ResId, Interval)
-            end;
-        not_found ->
-            ok
-    end.
-
-refresh_and_enable_rules_of_resource(ResId) ->
-    lists:foreach(
-        fun (#rule{id = Id, enabled = false, state = refresh_failed_at_bootup} = Rule) ->
-                emqx_rule_engine:refresh_rule(Rule),
-                emqx_rule_registry:add_rule(Rule#rule{enabled = true, state = normal}),
-                ?LOG(info, "rule ~s is refreshed and re-enabled", [Id]);
-            (_) -> ok
-        end, emqx_rule_registry:find_rules_depends_on_resource(ResId)).

+ 12 - 14
apps/emqx_rule_engine/src/emqx_rule_locker.erl

@@ -14,21 +14,19 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_rule_locker).
+%% Define the default actions.
+-module(emqx_rule_outputs).
+-include_lib("emqx/include/logger.hrl").
 
--export([start_link/0]).
-
--export([ lock/1
-        , unlock/1
+-export([ console/2
+        , get_selected_data/2
         ]).
 
-start_link() ->
-    ekka_locker:start_link(?MODULE).
-
--spec(lock(binary()) -> ekka_locker:lock_result()).
-lock(Id) ->
-    ekka_locker:acquire(?MODULE, Id, local).
+-spec console(map(), map()) -> any().
+console(Selected, #{metadata := #{rule_id := RuleId}} = Envs) ->
+    ?ULOG("[rule output] ~s~n"
+          "\tOutput Data: ~p~n"
+          "\tEnvs: ~p~n", [RuleId, Selected, Envs]).
 
--spec(unlock(binary()) -> {boolean(), [node()]}).
-unlock(Id) ->
-    ekka_locker:release(?MODULE, Id, local).
+get_selected_data(Selected, _Envs) ->
+     Selected.

+ 18 - 279
apps/emqx_rule_engine/src/emqx_rule_registry.erl

@@ -27,7 +27,7 @@
 
 %% Rule Management
 -export([ get_rules/0
-        , get_rules_for/1
+        , get_rules_for_topic/1
         , get_rules_with_same_event/1
         , get_rules_ordered_by_ts/0
         , get_rule/1
@@ -37,39 +37,6 @@
         , remove_rules/1
         ]).
 
-%% Action Management
--export([ add_action/1
-        , add_actions/1
-        , get_actions/0
-        , find_action/1
-        , remove_action/1
-        , remove_actions/1
-        , remove_actions_of/1
-        , add_action_instance_params/1
-        , get_action_instance_params/1
-        , remove_action_instance_params/1
-        ]).
-
-%% Resource Management
--export([ get_resources/0
-        , add_resource/1
-        , add_resource_params/1
-        , find_resource/1
-        , find_resource_params/1
-        , get_resources_by_type/1
-        , remove_resource/1
-        , remove_resource_params/1
-        ]).
-
-%% Resource Types
--export([ get_resource_types/0
-        , find_resource_type/1
-        , find_rules_depends_on_resource/1
-        , find_enabled_rules_depends_on_resource/1
-        , register_resource_types/1
-        , unregister_resource_types_of/1
-        ]).
-
 -export([ load_hooks_for_rule/1
         , unload_hooks_for_rule/1
         ]).
@@ -110,53 +77,15 @@ mnesia(boot) ->
                 {rlog_shard, ?RULE_ENGINE_SHARD},
                 {disc_copies, [node()]},
                 {record_name, rule},
-                {index, [#rule.for]},
                 {attributes, record_info(fields, rule)},
-                {storage_properties, StoreProps}]),
-    %% Rule action table
-    ok = ekka_mnesia:create_table(?ACTION_TAB, [
-                {rlog_shard, ?RULE_ENGINE_SHARD},
-                {ram_copies, [node()]},
-                {record_name, action},
-                {index, [#action.for, #action.app]},
-                {attributes, record_info(fields, action)},
-                {storage_properties, StoreProps}]),
-    %% Resource table
-    ok = ekka_mnesia:create_table(?RES_TAB, [
-                {rlog_shard, ?RULE_ENGINE_SHARD},
-                {disc_copies, [node()]},
-                {record_name, resource},
-                {index, [#resource.type]},
-                {attributes, record_info(fields, resource)},
-                {storage_properties, StoreProps}]),
-    %% Resource type table
-    ok = ekka_mnesia:create_table(?RES_TYPE_TAB, [
-                {rlog_shard, ?RULE_ENGINE_SHARD},
-                {ram_copies, [node()]},
-                {record_name, resource_type},
-                {index, [#resource_type.provider]},
-                {attributes, record_info(fields, resource_type)},
                 {storage_properties, StoreProps}]);
 
 mnesia(copy) ->
     %% Copy rule table
-    ok = ekka_mnesia:copy_table(?RULE_TAB, disc_copies),
-    %% Copy rule action table
-    ok = ekka_mnesia:copy_table(?ACTION_TAB, ram_copies),
-    %% Copy resource table
-    ok = ekka_mnesia:copy_table(?RES_TAB, disc_copies),
-    %% Copy resource type table
-    ok = ekka_mnesia:copy_table(?RES_TYPE_TAB, ram_copies).
+    ok = ekka_mnesia:copy_table(?RULE_TAB, disc_copies).
 
 dump() ->
-    ?ULOG("Rules: ~p~n"
-          "ActionInstParams: ~p~n"
-          "Resources: ~p~n"
-          "ResourceParams: ~p~n",
-          [ets:tab2list(?RULE_TAB),
-           ets:tab2list(?ACTION_INST_PARAMS_TAB),
-           ets:tab2list(?RES_TAB),
-           ets:tab2list(?RES_PARAMS_TAB)]).
+    ?ULOG("Rules: ~p~n", [ets:tab2list(?RULE_TAB)]).
 
 %%------------------------------------------------------------------------------
 %% Start the registry
@@ -182,16 +111,16 @@ get_rules_ordered_by_ts() ->
     {atomic, List} = ekka_mnesia:transaction(?RULE_ENGINE_SHARD, F),
     List.
 
--spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
-get_rules_for(Topic) ->
-    [Rule || Rule = #rule{for = For} <- get_rules(),
-             emqx_plugin_libs_rule:can_topic_match_oneof(Topic, For)].
+-spec(get_rules_for_topic(Topic :: binary()) -> list(emqx_rule_engine:rule())).
+get_rules_for_topic(Topic) ->
+    [Rule || Rule = #rule{info = #{from := From}} <- get_rules(),
+             emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)].
 
 -spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())).
 get_rules_with_same_event(Topic) ->
     EventName = emqx_rule_events:event_name(Topic),
-    [Rule || Rule = #rule{for = For} <- get_rules(),
-             lists:any(fun(T) -> is_of_event_name(EventName, T) end, For)].
+    [Rule || Rule = #rule{info = #{from := From}} <- get_rules(),
+             lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)].
 
 is_of_event_name(EventName, Topic) ->
     EventName =:= emqx_rule_events:event_name(Topic).
@@ -223,7 +152,7 @@ remove_rules(Rules) ->
 
 insert_rules([]) -> ok;
 insert_rules(Rules) ->
-    _ =  emqx_plugin_libs_rule:cluster_call(?MODULE, load_hooks_for_rule, [Rules]),
+    _ = emqx_plugin_libs_rule:cluster_call(?MODULE, load_hooks_for_rule, [Rules]),
     [mnesia:write(?RULE_TAB, Rule, write) ||Rule <- Rules].
 
 %% @private
@@ -235,7 +164,7 @@ delete_rules(Rules = [R|_]) when is_binary(R) ->
                 {ok, Rule} ->  [Rule|Acc];
                 not_found -> Acc
             end
-                    end, [], Rules),
+        end, [], Rules),
     delete_rules_unload_hooks(RuleRecs);
 delete_rules(Rules = [Rule|_]) when is_record(Rule, rule) ->
     delete_rules_unload_hooks(Rules).
@@ -245,209 +174,20 @@ delete_rules_unload_hooks(Rules) ->
     [mnesia:delete_object(?RULE_TAB, Rule, write) ||Rule <- Rules].
 
 load_hooks_for_rule(Rules) ->
-    lists:foreach(fun(#rule{for = Topics}) ->
-        lists:foreach(fun emqx_rule_events:load/1, Topics)
-                  end, Rules).
+    lists:foreach(fun(#rule{info = #{from := Topics}}) ->
+            lists:foreach(fun emqx_rule_events:load/1, Topics)
+        end, Rules).
 
 unload_hooks_for_rule(Rules) ->
-    lists:foreach(fun(#rule{id = Id, for = Topics}) ->
+    lists:foreach(fun(#rule{id = Id, info = #{from := Topics}}) ->
         lists:foreach(fun(Topic) ->
             case get_rules_with_same_event(Topic) of
-                [#rule{id = Id}] -> %% we are now deleting the last rule
+                [#rule{id = Id0}] when Id0 == Id -> %% we are now deleting the last rule
                     emqx_rule_events:unload(Topic);
                 _ -> ok
             end
-                      end, Topics)
-                  end, Rules).
-
-%%------------------------------------------------------------------------------
-%% Action Management
-%%------------------------------------------------------------------------------
-
-%% @doc Get all actions.
--spec(get_actions() -> list(emqx_rule_engine:action())).
-get_actions() ->
-    get_all_records(?ACTION_TAB).
-
-%% @doc Find an action by name.
--spec(find_action(Name :: action_name()) -> {ok, emqx_rule_engine:action()} | not_found).
-find_action(Name) ->
-    case mnesia:dirty_read(?ACTION_TAB, Name) of
-        [Action] -> {ok, Action};
-        [] -> not_found
-    end.
-
-%% @doc Add an action.
--spec(add_action(emqx_rule_engine:action()) -> ok).
-add_action(Action) when is_record(Action, action) ->
-    trans(fun insert_action/1, [Action]).
-
-%% @doc Add actions.
--spec(add_actions(list(emqx_rule_engine:action())) -> ok).
-add_actions(Actions) when is_list(Actions) ->
-    trans(fun lists:foreach/2, [fun insert_action/1, Actions]).
-
-%% @doc Remove an action.
--spec(remove_action(emqx_rule_engine:action() | atom()) -> ok).
-remove_action(Action) when is_record(Action, action) ->
-    trans(fun delete_action/1, [Action]);
-
-remove_action(Name) ->
-    trans(fun mnesia:delete/1, [{?ACTION_TAB, Name}]).
-
-%% @doc Remove actions.
--spec(remove_actions(list(emqx_rule_engine:action())) -> ok).
-remove_actions(Actions) ->
-    trans(fun lists:foreach/2, [fun delete_action/1, Actions]).
-
-%% @doc Remove actions of the App.
--spec(remove_actions_of(App :: atom()) -> ok).
-remove_actions_of(App) ->
-    trans(fun() ->
-            lists:foreach(fun delete_action/1, mnesia:index_read(?ACTION_TAB, App, #action.app))
-          end).
-
-%% @private
-insert_action(Action) ->
-    mnesia:write(?ACTION_TAB, Action, write).
-
-%% @private
-delete_action(Action) when is_record(Action, action) ->
-    mnesia:delete_object(?ACTION_TAB, Action, write);
-delete_action(Name) when is_atom(Name) ->
-    mnesia:delete(?ACTION_TAB, Name, write).
-
-%% @doc Add an action instance params.
--spec(add_action_instance_params(emqx_rule_engine:action_instance_params()) -> ok).
-add_action_instance_params(ActionInstParams) when is_record(ActionInstParams, action_instance_params) ->
-    ets:insert(?ACTION_INST_PARAMS_TAB, ActionInstParams),
-    ok.
-
--spec(get_action_instance_params(action_instance_id()) -> {ok, emqx_rule_engine:action_instance_params()} | not_found).
-get_action_instance_params(ActionInstId) ->
-    case ets:lookup(?ACTION_INST_PARAMS_TAB, ActionInstId) of
-        [ActionInstParams] -> {ok, ActionInstParams};
-        [] -> not_found
-    end.
-
-%% @doc Delete an action instance params.
--spec(remove_action_instance_params(action_instance_id()) -> ok).
-remove_action_instance_params(ActionInstId) ->
-    ets:delete(?ACTION_INST_PARAMS_TAB, ActionInstId),
-    ok.
-
-%%------------------------------------------------------------------------------
-%% Resource Management
-%%------------------------------------------------------------------------------
-
--spec(get_resources() -> list(emqx_rule_engine:resource())).
-get_resources() ->
-    get_all_records(?RES_TAB).
-
--spec(add_resource(emqx_rule_engine:resource()) -> ok).
-add_resource(Resource) when is_record(Resource, resource) ->
-    trans(fun insert_resource/1, [Resource]).
-
--spec(add_resource_params(emqx_rule_engine:resource_params()) -> ok).
-add_resource_params(ResParams) when is_record(ResParams, resource_params) ->
-    ets:insert(?RES_PARAMS_TAB, ResParams),
-    ok.
-
--spec(find_resource(Id :: resource_id()) -> {ok, emqx_rule_engine:resource()} | not_found).
-find_resource(Id) ->
-    case mnesia:dirty_read(?RES_TAB, Id) of
-        [Res] -> {ok, Res};
-        [] -> not_found
-    end.
-
--spec(find_resource_params(Id :: resource_id())
-        -> {ok, emqx_rule_engine:resource_params()} | not_found).
-find_resource_params(Id) ->
-    case ets:lookup(?RES_PARAMS_TAB, Id) of
-        [ResParams] -> {ok, ResParams};
-        [] -> not_found
-    end.
-
--spec(remove_resource(emqx_rule_engine:resource() | emqx_rule_engine:resource_id()) -> ok | {error, term()}).
-remove_resource(Resource) when is_record(Resource, resource) ->
-    trans(fun delete_resource/1, [Resource#resource.id]);
-
-remove_resource(ResId) when is_binary(ResId) ->
-    trans(fun delete_resource/1, [ResId]).
-
--spec(remove_resource_params(emqx_rule_engine:resource_id()) -> ok).
-remove_resource_params(ResId) ->
-    ets:delete(?RES_PARAMS_TAB, ResId),
-    ok.
-
-%% @private
-delete_resource(ResId) ->
-    case find_enabled_rules_depends_on_resource(ResId) of
-        [] -> mnesia:delete(?RES_TAB, ResId, write);
-        Rules ->
-            {error, {dependent_rules_exists, [Id || #rule{id = Id} <- Rules]}}
-    end.
-
-%% @private
-insert_resource(Resource) ->
-    mnesia:write(?RES_TAB, Resource, write).
-
-find_enabled_rules_depends_on_resource(ResId) ->
-    [R || #rule{enabled = true} = R <- find_rules_depends_on_resource(ResId)].
-
-find_rules_depends_on_resource(ResId) ->
-    lists:foldl(fun(#rule{actions = Actions} = R, Rules) ->
-        case search_action_despends_on_resource(ResId, Actions) of
-            false -> Rules;
-            {value, _} -> [R | Rules]
-        end
-    end, [], get_rules()).
-
-search_action_despends_on_resource(ResId, Actions) ->
-    lists:search(fun
-        (#action_instance{args = #{<<"$resource">> := ResId0}}) ->
-            ResId0 =:= ResId;
-        (_) ->
-            false
-    end, Actions).
-
-%%------------------------------------------------------------------------------
-%% Resource Type Management
-%%------------------------------------------------------------------------------
-
--spec(get_resource_types() -> list(emqx_rule_engine:resource_type())).
-get_resource_types() ->
-    get_all_records(?RES_TYPE_TAB).
-
--spec(find_resource_type(Name :: resource_type_name()) -> {ok, emqx_rule_engine:resource_type()} | not_found).
-find_resource_type(Name) ->
-    case mnesia:dirty_read(?RES_TYPE_TAB, Name) of
-        [ResType] -> {ok, ResType};
-        [] -> not_found
-    end.
-
--spec(get_resources_by_type(Type :: resource_type_name()) -> list(emqx_rule_engine:resource())).
-get_resources_by_type(Type) ->
-    mnesia:dirty_index_read(?RES_TAB, Type, #resource.type).
-
--spec(register_resource_types(list(emqx_rule_engine:resource_type())) -> ok).
-register_resource_types(Types) ->
-    trans(fun lists:foreach/2, [fun insert_resource_type/1, Types]).
-
-%% @doc Unregister resource types of the App.
--spec(unregister_resource_types_of(App :: atom()) -> ok).
-unregister_resource_types_of(App) ->
-    trans(fun() ->
-            lists:foreach(fun delete_resource_type/1, mnesia:index_read(?RES_TYPE_TAB, App, #resource_type.provider))
-          end).
-
-%% @private
-insert_resource_type(Type) ->
-    mnesia:write(?RES_TYPE_TAB, Type, write).
-
-%% @private
-delete_resource_type(Type) ->
-    mnesia:delete_object(?RES_TYPE_TAB, Type, write).
+        end, Topics)
+    end, Rules).
 
 %%------------------------------------------------------------------------------
 %% gen_server callbacks
@@ -500,7 +240,6 @@ get_all_records(Tab) ->
                                    end),
     Ret.
 
-trans(Fun) -> trans(Fun, []).
 trans(Fun, Args) ->
     case ekka_mnesia:transaction(?RULE_ENGINE_SHARD, Fun, Args) of
         {atomic, Result} -> Result;

+ 32 - 88
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -48,7 +48,7 @@
 -spec(apply_rules(list(emqx_rule_engine:rule()), input()) -> ok).
 apply_rules([], _Input) ->
     ok;
-apply_rules([#rule{enabled = false}|More], Input) ->
+apply_rules([#rule{info = #{enabled := false}}|More], Input) ->
     apply_rules(More, Input);
 apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
     try apply_rule_discard_result(Rule, Input)
@@ -80,14 +80,14 @@ apply_rule(Rule = #rule{id = RuleID}, Input) ->
     clear_rule_payload(),
     do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
 
-do_apply_rule(#rule{id = RuleId,
-                    is_foreach = true,
-                    fields = Fields,
-                    doeach = DoEach,
-                    incase = InCase,
-                    conditions = Conditions,
-                    on_action_failed = OnFailed,
-                    actions = Actions}, Input) ->
+do_apply_rule(#rule{id = RuleId, info = #{
+            is_foreach := true,
+            fields := Fields,
+            doeach := DoEach,
+            incase := InCase,
+            conditions := Conditions,
+            outputs := Outputs
+        }}, Input) ->
     {Selected, Collection} = ?RAISE(select_and_collect(Fields, Input),
                                         {select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
     ColumnsAndSelected = maps:merge(Input, Selected),
@@ -96,24 +96,24 @@ do_apply_rule(#rule{id = RuleId,
         true ->
             ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
             Collection2 = filter_collection(Input, InCase, DoEach, Collection),
-            {ok, [take_actions(Actions, Coll, Input, OnFailed) || Coll <- Collection2]};
+            {ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]};
         false ->
             {error, nomatch}
     end;
 
-do_apply_rule(#rule{id = RuleId,
-                    is_foreach = false,
-                    fields = Fields,
-                    conditions = Conditions,
-                    on_action_failed = OnFailed,
-                    actions = Actions}, Input) ->
+do_apply_rule(#rule{id = RuleId, info = #{
+            is_foreach := false,
+            fields := Fields,
+            conditions := Conditions,
+            outputs := Outputs
+        }}, Input) ->
     Selected = ?RAISE(select_and_transform(Fields, Input),
                       {select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
     case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
                 {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
         true ->
             ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
-            {ok, take_actions(Actions, Selected, Input, OnFailed)};
+            {ok, handle_output_list(Outputs, Selected, Input)};
         false ->
             {error, nomatch}
     end.
@@ -198,8 +198,6 @@ match_conditions({'fun', {_, Name}, Args}, Data) ->
     apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data);
 match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
     compare(Op, eval(L, Data), eval(R, Data));
-%%match_conditions({'like', Var, Pattern}, Data) ->
-%%    match_like(eval(Var, Data), Pattern);
 match_conditions({}, _Data) ->
     true.
 
@@ -229,81 +227,27 @@ number(Bin) ->
     catch error:badarg -> binary_to_float(Bin)
     end.
 
-%% Step3 -> Take actions
-take_actions(Actions, Selected, Envs, OnFailed) ->
-    [take_action(ActInst, Selected, Envs, OnFailed, ?ActionMaxRetry)
-     || ActInst <- Actions].
+handle_output_list(Outputs, Selected, Envs) ->
+    [handle_output(Out, Selected, Envs) || Out <- Outputs].
 
-take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = ActInst,
-            Selected, Envs, OnFailed, RetryN) when RetryN >= 0 ->
+handle_output(OutId, Selected, Envs) ->
     try
-        {ok, #action_instance_params{apply = Apply}}
-            = emqx_rule_registry:get_action_instance_params(Id),
-        emqx_rule_metrics:inc_actions_taken(Id),
-        apply_action_func(Selected, Envs, Apply, ActName)
-    of
-        {badact, Reason} ->
-            handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, Reason);
-        Result -> Result
+        do_handle_output(OutId, Selected, Envs)
     catch
-        error:{badfun, _Func}:_ST ->
-            %?LOG(warning, "Action ~p maybe outdated, refresh it and try again."
-            %              "Func: ~p~nST:~0p", [Id, Func, ST]),
-            _ = trans_action_on(Id, fun() ->
-                emqx_rule_engine:refresh_actions([ActInst])
-            end, 5000),
-            emqx_rule_metrics:inc_actions_retry(Id),
-            take_action(ActInst, Selected, Envs, OnFailed, RetryN-1);
-        Error:Reason:Stack ->
-            emqx_rule_metrics:inc_actions_exception(Id),
-            handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {Error, Reason, Stack})
-    end;
-
-take_action(#action_instance{id = Id, fallbacks = Fallbacks}, Selected, Envs, OnFailed, _RetryN) ->
-    emqx_rule_metrics:inc_actions_error(Id),
-    handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {max_try_reached, ?ActionMaxRetry}).
-
-apply_action_func(Data, Envs, #{mod := Mod, bindings := Bindings}, Name) ->
-    %% TODO: Build the Func Name when creating the action
-    Func = cbk_on_action_triggered(Name),
-    Mod:Func(Data, Envs#{'__bindings__' => Bindings});
-apply_action_func(Data, Envs, Func, _Name) when is_function(Func) ->
-    erlang:apply(Func, [Data, Envs]).
-
-cbk_on_action_triggered(Name) ->
-    list_to_atom("on_action_" ++ atom_to_list(Name)).
-
-trans_action_on(Id, Callback, Timeout) ->
-    case emqx_rule_locker:lock(Id) of
-        true -> try Callback() after emqx_rule_locker:unlock(Id) end;
-        _ ->
-            wait_action_on(Id, Timeout div 10)
+        Err:Reason:ST ->
+            ?LOG(warning, "Output to ~p failed, ~p", [OutId, {Err, Reason, ST}])
     end.
 
-wait_action_on(_, 0) ->
-    {error, timeout};
-wait_action_on(Id, RetryN) ->
-    timer:sleep(10),
-    case emqx_rule_registry:get_action_instance_params(Id) of
-        not_found ->
-            {error, not_found};
-        {ok, #action_instance_params{apply = Apply}} ->
-            case catch apply_action_func(baddata, #{}, Apply, tryit) of
-                {'EXIT', {{badfun, _}, _}} ->
-                    wait_action_on(Id, RetryN-1);
-                _ ->
-                    ok
-            end
-    end.
+do_handle_output(<<"bridge:", _/binary>> = _ChannelId, _Selected, _Envs) ->
+    ?LOG(warning, "calling bridge from rules has not been implemented yet!");
 
-handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) ->
-    ?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]),
-    _ = take_actions(Fallbacks, Selected, Envs, continue),
-    failed;
-handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) ->
-    ?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]),
-    _ = take_actions(Fallbacks, Selected, Envs, continue),
-    error({take_action_failed, {Id, Reason}}).
+do_handle_output(BuiltInOutput, Selected, Envs) ->
+    try binary_to_existing_atom(BuiltInOutput) of Func ->
+        erlang:apply(emqx_rule_outputs, Func, [Selected, Envs])
+    catch
+        error:badarg -> error(not_found);
+        error:undef -> error(not_found)
+    end.
 
 eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
     nested_get({path, Path}, may_decode_payload(Payload));

+ 4 - 4
apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl

@@ -18,7 +18,7 @@
 
 -include("rule_engine.hrl").
 
--export([parse_select/1]).
+-export([parse/1]).
 
 -export([ select_fields/1
         , select_is_foreach/1
@@ -50,12 +50,12 @@
 
 %% Dialyzer gives up on the generated code.
 %% probably due to stack depth, or inlines.
--dialyzer({nowarn_function, [parse_select/1]}).
+-dialyzer({nowarn_function, [parse/1]}).
 
 %% Parse one select statement.
--spec(parse_select(string() | binary())
+-spec(parse(string() | binary())
       -> {ok, select()} | {parse_error, term()} | {lex_error, term()}).
-parse_select(Sql) ->
+parse(Sql) ->
     try case rulesql:parsetree(Sql) of
             {ok, {select, Clauses}} ->
                 {ok, #select{

+ 20 - 30
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -18,6 +18,7 @@
 -include_lib("emqx/include/logger.hrl").
 
 -export([ test/1
+        , echo_action/2
         ]).
 
 %% Dialyzer gives up on the generated code.
@@ -25,15 +26,14 @@
 -dialyzer({nowarn_function, [test/1,
                              test_rule/4,
                              flatten/1,
-                             sql_test_action/0,
                              fill_default_values/2,
                              envs_examp/1
                              ]}).
 
 -spec(test(#{}) -> {ok, map() | list()} | {error, term()}).
-test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
-    {ok, Select} = emqx_rule_sqlparser:parse_select(Sql),
-    InTopic = maps:get(<<"topic">>, Context, <<>>),
+test(#{sql := Sql, context := Context}) ->
+    {ok, Select} = emqx_rule_sqlparser:parse(Sql),
+    InTopic = maps:get(topic, Context, <<>>),
     EventTopics = emqx_rule_sqlparser:select_from(Select),
     case lists:all(fun is_publish_topic/1, EventTopics) of
         true ->
@@ -48,38 +48,30 @@ test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
     end.
 
 test_rule(Sql, Select, Context, EventTopics) ->
-    RuleId = iolist_to_binary(["test_rule", emqx_misc:gen_id()]),
-    ActInstId = iolist_to_binary(["test_action", emqx_misc:gen_id()]),
+    RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]),
     ok = emqx_rule_metrics:create_rule_metrics(RuleId),
-    ok = emqx_rule_metrics:create_metrics(ActInstId),
     Rule = #rule{
         id = RuleId,
-        rawsql = Sql,
-        for = EventTopics,
-        is_foreach = emqx_rule_sqlparser:select_is_foreach(Select),
-        fields = emqx_rule_sqlparser:select_fields(Select),
-        doeach = emqx_rule_sqlparser:select_doeach(Select),
-        incase = emqx_rule_sqlparser:select_incase(Select),
-        conditions = emqx_rule_sqlparser:select_where(Select),
-        actions = [#action_instance{
-                    id = ActInstId,
-                    name = test_rule_sql}]
+        info = #{
+            sql => Sql,
+            from => EventTopics,
+            outputs => [<<"get_selected_data">>],
+            enabled => true,
+            is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
+            fields => emqx_rule_sqlparser:select_fields(Select),
+            doeach => emqx_rule_sqlparser:select_doeach(Select),
+            incase => emqx_rule_sqlparser:select_incase(Select),
+            conditions => emqx_rule_sqlparser:select_where(Select)
+        }
     },
     FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
     try
-        ok = emqx_rule_registry:add_action_instance_params(
-                #action_instance_params{id = ActInstId,
-                                        params = #{},
-                                        apply = sql_test_action()}),
-        R = emqx_rule_runtime:apply_rule(Rule, FullContext),
-        emqx_rule_metrics:clear_rule_metrics(RuleId),
-        emqx_rule_metrics:clear_metrics(ActInstId),
-        R
+        emqx_rule_runtime:apply_rule(Rule, FullContext)
     of
         {ok, Data} -> {ok, flatten(Data)};
         {error, nomatch} -> {error, nomatch}
     after
-        ok = emqx_rule_registry:remove_action_instance_params(ActInstId)
+        emqx_rule_metrics:clear_rule_metrics(RuleId)
     end.
 
 is_publish_topic(<<"$events/", _/binary>>) -> false;
@@ -90,10 +82,8 @@ flatten([D1]) -> D1;
 flatten([D1 | L]) when is_list(D1) ->
     D1 ++ flatten(L).
 
-sql_test_action() ->
-    fun(Data, _Envs) ->
-        ?LOG(info, "Testing Rule SQL OK"), Data
-    end.
+echo_action(Data, _Envs) ->
+    ?LOG(info, "Testing Rule SQL OK"), Data.
 
 fill_default_values(Event, Context) ->
     maps:merge(envs_examp(Event), Context).

+ 0 - 195
apps/emqx_rule_engine/src/emqx_rule_validator.erl

@@ -1,195 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_rule_validator).
-
--include("rule_engine.hrl").
-
--export([ validate_params/2
-        , validate_spec/1
-        ]).
-
--type name() :: atom().
-
--type spec() :: #{
-    type := data_type(),
-    required => boolean(),
-    default => term(),
-    enum => list(term()),
-    schema => spec()
-}.
-
--type data_type() :: string | password | number | boolean
-    | object | array | file | cfgselect.
-
--type params_spec() :: #{name() => spec()} | any.
--type params() :: #{binary() => term()}.
-
--define(DATA_TYPES,
-        [ string
-        , password %% TODO: [5.0] remove this, use string instead
-        , number
-        , boolean
-        , object
-        , array
-        , file
-        , cfgselect %% TODO: [5.0] refactor this
-        ]).
-
-%%------------------------------------------------------------------------------
-%% APIs
-%%------------------------------------------------------------------------------
-
-%% Validate the params according to the spec.
-%% Some keys will be added into the result if they have default values in spec.
-%% Note that this function throws exception in case of validation failure.
--spec(validate_params(params(), params_spec()) -> params()).
-validate_params(Params, any) -> Params;
-validate_params(Params, ParamsSepc) ->
-    maps:fold(fun(Name, Spec, Params0) ->
-        IsRequired = maps:get(required, Spec, false),
-        BinName = bin(Name),
-        find_field(Name, Params,
-            fun (not_found) when IsRequired =:= true ->
-                    throw({required_field_missing, BinName});
-                (not_found) when IsRequired =:= false ->
-                    case maps:find(default, Spec) of
-                        {ok, Default} -> Params0#{BinName => Default};
-                        error -> Params0
-                    end;
-                (Val) ->
-                    Params0#{BinName => validate_value(Val, Spec)}
-            end)
-    end, Params, ParamsSepc).
-
--spec(validate_spec(params_spec()) -> ok).
-validate_spec(any) -> ok;
-validate_spec(ParamsSepc) ->
-    map_foreach(fun do_validate_spec/2, ParamsSepc).
-
-%%------------------------------------------------------------------------------
-%% Internal Functions
-%%------------------------------------------------------------------------------
-
-validate_value(Val, #{enum := Enum}) ->
-    validate_enum(Val, Enum);
-validate_value(Val, #{type := object} = Spec) ->
-    validate_params(Val, maps:get(schema, Spec, any));
-validate_value(Val, #{type := Type} = Spec) ->
-    validate_type(Val, Type, Spec).
-
-validate_type(Val, file, _Spec) ->
-    validate_file(Val);
-validate_type(Val, String, Spec) when String =:= string;
-                                      String =:= password ->
-    validate_string(Val, reg_exp(maps:get(format, Spec, any)));
-validate_type(Val, number, Spec) ->
-    validate_number(Val, maps:get(range, Spec, any));
-validate_type(Val, boolean, _Spec) ->
-    validate_boolean(Val);
-validate_type(Val, array, Spec) ->
-    ItemsSpec = maps:get(items, Spec),
-    [validate_value(V, ItemsSpec) || V <- Val];
-validate_type(Val, cfgselect, _Spec) ->
-    %% TODO: [5.0] refactor this.
-    Val.
-
-validate_enum(Val, Enum) ->
-    case lists:member(Val, Enum) of
-        true -> Val;
-        false -> throw({invalid_data_type, {enum, {Val, Enum}}})
-    end.
-
-validate_string(Val, RegExp) ->
-    try re:run(Val, RegExp) of
-        nomatch -> throw({invalid_data_type, {string, Val}});
-        _Match -> Val
-    catch
-        _:_ -> throw({invalid_data_type, {string, Val}})
-    end.
-
-validate_number(Val, any) when is_integer(Val); is_float(Val) ->
-    Val;
-validate_number(Val, _Range = [Min, Max])
-        when (is_integer(Val) orelse is_float(Val)),
-             (Val >= Min andalso Val =< Max) ->
-    Val;
-validate_number(Val, Range) ->
-    throw({invalid_data_type, {number, {Val, Range}}}).
-
-validate_boolean(true) -> true;
-validate_boolean(<<"true">>) -> true;
-validate_boolean(false) -> false;
-validate_boolean(<<"false">>) -> false;
-validate_boolean(Val) -> throw({invalid_data_type, {boolean, Val}}).
-
-validate_file(Val) when is_map(Val) -> Val;
-validate_file(Val) when is_list(Val) -> Val;
-validate_file(Val) when is_binary(Val) -> Val;
-validate_file(Val) -> throw({invalid_data_type, {file, Val}}).
-
-reg_exp(url) -> "^https?://\\w+(\.\\w+)*(:[0-9]+)?";
-reg_exp(topic) -> "^/?(\\w|\\#|\\+)+(/?(\\w|\\#|\\+))*/?$";
-reg_exp(resource_type) -> "[a-zA-Z0-9_:-]";
-reg_exp(any) -> ".*";
-reg_exp(RegExp) -> RegExp.
-
-do_validate_spec(Name, #{type := object} = Spec) ->
-    find_field(schema, Spec,
-        fun (not_found) -> throw({required_field_missing, {schema, {in, Name}}});
-            (Schema) -> validate_spec(Schema)
-        end);
-do_validate_spec(Name, #{type := array} = Spec) ->
-    find_field(items, Spec,
-        fun (not_found) -> throw({required_field_missing, {items, {in, Name}}});
-            (Items) -> do_validate_spec(Name, Items)
-        end);
-do_validate_spec(_Name, #{type := Type}) ->
-    _ = supported_data_type(Type, ?DATA_TYPES);
-
-do_validate_spec(Name, _Spec) ->
-    throw({required_field_missing, {type, {in, Name}}}).
-
-supported_data_type(Type, Supported) ->
-    case lists:member(Type, Supported) of
-        false -> throw({unsupported_data_types, Type});
-        true -> ok
-    end.
-
-map_foreach(Fun, Map) ->
-    Iterator = maps:iterator(Map),
-    map_foreach_loop(Fun, maps:next(Iterator)).
-
-map_foreach_loop(_Fun, none) -> ok;
-map_foreach_loop(Fun, {Key, Value, Iterator}) ->
-    _ = Fun(Key, Value),
-    map_foreach_loop(Fun, maps:next(Iterator)).
-
-find_field(Field, Spec, Func) ->
-    do_find_field([bin(Field), Field], Spec, Func).
-
-do_find_field([], _Spec, Func) ->
-    Func(not_found);
-do_find_field([F | Fields], Spec, Func) ->
-    case maps:find(F, Spec) of
-        {ok, Value} -> Func(Value);
-        error ->
-            do_find_field(Fields, Spec, Func)
-    end.
-
-bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
-bin(Str) when is_list(Str) -> iolist_to_binary(Str);
-bin(Bin) when is_binary(Bin) -> Bin.

+ 8 - 8
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -79,8 +79,8 @@ groups() ->
        t_create_existing_rule,
        t_update_rule,
        t_disable_rule,
-       t_get_rules_for,
-       t_get_rules_for_2,
+       t_get_rules_for_topic,
+       t_get_rules_for_topic_2,
        t_get_rules_with_same_event,
        t_add_get_remove_action,
        t_add_get_remove_actions,
@@ -650,12 +650,12 @@ t_disable_rule(_Config) ->
     ?assert(DAt3 < Now3),
     ok = emqx_rule_engine:delete_rule(<<"simple_rule_2">>).
 
-t_get_rules_for(_Config) ->
-    Len0 = length(emqx_rule_registry:get_rules_for(<<"simple/topic">>)),
+t_get_rules_for_topic(_Config) ->
+    Len0 = length(emqx_rule_registry:get_rules_for_topic(<<"simple/topic">>)),
     ok = emqx_rule_registry:add_rules(
             [make_simple_rule(<<"rule-debug-1">>),
              make_simple_rule(<<"rule-debug-2">>)]),
-    ?assertEqual(Len0+2, length(emqx_rule_registry:get_rules_for(<<"simple/topic">>))),
+    ?assertEqual(Len0+2, length(emqx_rule_registry:get_rules_for_topic(<<"simple/topic">>))),
     ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>]),
     ok.
 
@@ -672,8 +672,8 @@ t_get_rules_ordered_by_ts(_Config) ->
         #rule{id = <<"rule-debug-2">>}
     ], emqx_rule_registry:get_rules_ordered_by_ts()).
 
-t_get_rules_for_2(_Config) ->
-    Len0 = length(emqx_rule_registry:get_rules_for(<<"simple/1">>)),
+t_get_rules_for_topic_2(_Config) ->
+    Len0 = length(emqx_rule_registry:get_rules_for_topic(<<"simple/1">>)),
     ok = emqx_rule_registry:add_rules(
             [make_simple_rule(<<"rule-debug-1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
              make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/+\"">>, [<<"simple/+">>]),
@@ -682,7 +682,7 @@ t_get_rules_for_2(_Config) ->
              make_simple_rule(<<"rule-debug-5">>, <<"select * from \"simple/2,simple/+,simple/3\"">>, [<<"simple/2">>,<<"simple/+">>, <<"simple/3">>]),
              make_simple_rule(<<"rule-debug-6">>, <<"select * from \"simple/2,simple/3,simple/4\"">>, [<<"simple/2">>,<<"simple/3">>, <<"simple/4">>])
              ]),
-    ?assertEqual(Len0+4, length(emqx_rule_registry:get_rules_for(<<"simple/1">>))),
+    ?assertEqual(Len0+4, length(emqx_rule_registry:get_rules_for_topic(<<"simple/1">>))),
     ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]),
     ok.
 

+ 0 - 109
apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl

@@ -1,109 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_rule_monitor_SUITE).
-
--compile(export_all).
--compile(nowarn_export_all).
-
--include_lib("emqx_rule_engine/include/rule_engine.hrl").
--include_lib("emqx/include/emqx.hrl").
-
--include_lib("eunit/include/eunit.hrl").
--include_lib("common_test/include/ct.hrl").
-
-all() ->
-    [ {group, resource}
-    ].
-
-suite() ->
-    [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
-
-groups() ->
-    [{resource, [sequence],
-      [ t_restart_resource
-      ]}
-    ].
-
-init_per_suite(Config) ->
-    application:load(emqx_machine),
-    ok = ekka_mnesia:start(),
-    ok = emqx_rule_registry:mnesia(boot),
-    Config.
-
-end_per_suite(_Config) ->
-    ok.
-
-init_per_testcase(t_restart_resource, Config) ->
-    Opts = [public, named_table, set, {read_concurrency, true}],
-    _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
-    ets:new(t_restart_resource, [named_table, public]),
-    ets:insert(t_restart_resource, {failed_count, 0}),
-    ets:insert(t_restart_resource, {succ_count, 0}),
-    Config;
-
-init_per_testcase(_, Config) ->
-    Config.
-
-end_per_testcase(t_restart_resource, Config) ->
-    ets:delete(t_restart_resource),
-    Config;
-end_per_testcase(_, Config) ->
-    Config.
-
-t_restart_resource(_) ->
-    {ok, _} = emqx_rule_monitor:start_link(),
-    emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc,1000),
-    ok = emqx_rule_registry:register_resource_types(
-            [#resource_type{
-                name = test_res_1,
-                provider = ?APP,
-                params_spec = #{},
-                on_create = {?MODULE, on_resource_create},
-                on_destroy = {?MODULE, on_resource_destroy},
-                on_status = {?MODULE, on_get_resource_status},
-                title = #{en => <<"Test Resource">>},
-                description = #{en => <<"Test Resource">>}}]),
-    ok = emqx_rule_engine:load_providers(),
-    {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
-            #{type => test_res_1,
-              config => #{},
-              description => <<"debug resource">>}),
-    [{_, 1}] = ets:lookup(t_restart_resource, failed_count),
-    [{_, 0}] = ets:lookup(t_restart_resource, succ_count),
-    ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]),
-    emqx_rule_monitor:ensure_resource_retrier(ResId, 100),
-    timer:sleep(1000),
-    [{_, 5}] = ets:lookup(t_restart_resource, failed_count),
-    [{_, 1}] = ets:lookup(t_restart_resource, succ_count),
-    #{retryers := Pids} = sys:get_state(whereis(emqx_rule_monitor)),
-    ?assertEqual(0, map_size(Pids)),
-    ok = emqx_rule_engine:unload_providers(),
-    emqx_rule_registry:remove_resource(ResId),
-    emqx_rule_monitor:stop(),
-    ok.
-
-on_resource_create(Id, _) ->
-    case ets:lookup(t_restart_resource, failed_count) of
-        [{_, 5}] ->
-            ets:insert(t_restart_resource, {succ_count, 1}),
-            #{};
-        [{_, N}] ->
-            ets:insert(t_restart_resource, {failed_count, N+1}),
-            error({incorrect_params, Id})
-    end.
-on_resource_destroy(_Id, _) -> ok.
-on_get_resource_status(_Id, _) -> #{}.

+ 1 - 1
apps/emqx_rule_engine/test/emqx_rule_registry_SUITE.erl

@@ -38,7 +38,7 @@ end_per_testcase(_TestCase, Config) ->
 % t_start_link(_) ->
 %     error('TODO').
 
-% t_get_rules_for(_) ->
+% t_get_rules_for_topic(_) ->
 %     error('TODO').
 
 % t_add_rules(_) ->

+ 0 - 191
apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl

@@ -1,191 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_rule_validator_SUITE).
-
--compile(nowarn_export_all).
--compile(export_all).
-
--include_lib("eunit/include/eunit.hrl").
-
--define(VALID_SPEC,
-    #{
-        string_required => #{
-            type => string,
-            required => true
-        },
-        string_optional_with_default => #{
-            type => string,
-            required => false,
-            default => <<"a/b">>
-        },
-        string_optional_without_default_0 => #{
-            type => string,
-            required => false
-        },
-        string_optional_without_default_1 => #{
-            type => string
-        },
-        type_number => #{
-            type => number,
-            required => true
-        },
-        type_boolean => #{
-            type => boolean,
-            required => true
-        },
-        type_enum_number => #{
-            type => number,
-            enum => [-1, 0, 1, 2],
-            required => true
-        },
-        type_file => #{
-            type => file,
-            required => true
-        },
-        type_object => #{
-            type => object,
-            required => true,
-            schema => #{
-                string_required => #{
-                    type => string,
-                    required => true
-                },
-                type_number => #{
-                    type => number,
-                    required => true
-                }
-            }
-        },
-        type_array => #{
-            type => array,
-            required => true,
-            items => #{
-                type => string,
-                required => true
-            }
-        }
-    }).
-
-all() -> emqx_ct:all(?MODULE).
-
-t_validate_spec_the_complex(_) ->
-    ok = emqx_rule_validator:validate_spec(?VALID_SPEC).
-
-t_validate_spec_invalid_1(_) ->
-    ?assertThrow({required_field_missing, {type, _}},
-        emqx_rule_validator:validate_spec(#{
-            type_enum_number => #{
-                required => true
-            }
-        })).
-
-t_validate_spec_invalid_2(_) ->
-    ?assertThrow({required_field_missing, {schema, _}},
-        emqx_rule_validator:validate_spec(#{
-            type_enum_number => #{
-                type => object
-            }
-        })).
-
-t_validate_spec_invalid_3(_) ->
-    ?assertThrow({required_field_missing, {items, _}},
-        emqx_rule_validator:validate_spec(#{
-            type_enum_number => #{
-                type => array
-            }
-        })).
-
-t_validate_params_0(_) ->
-    Params = #{<<"eee">> => <<"eee">>},
-    Specs = #{<<"eee">> => #{
-                type => string,
-                required => true
-             }},
-    ?assertEqual(Params,
-        emqx_rule_validator:validate_params(Params, Specs)).
-
-t_validate_params_1(_) ->
-    Params = #{<<"eee">> => 1},
-    Specs = #{<<"eee">> => #{
-                type => string,
-                required => true
-             }},
-    ?assertThrow({invalid_data_type, {string, 1}},
-        emqx_rule_validator:validate_params(Params, Specs)).
-
-t_validate_params_2(_) ->
-    ?assertThrow({required_field_missing, <<"eee">>},
-        emqx_rule_validator:validate_params(
-            #{<<"abc">> => 1},
-            #{<<"eee">> => #{
-                type => string,
-                required => true
-             }})).
-
-t_validate_params_format(_) ->
-    Params = #{<<"eee">> => <<"abc">>},
-    Params1 = #{<<"eee">> => <<"http://abc:8080">>},
-    Params2 = #{<<"eee">> => <<"http://abc">>},
-    Specs = #{<<"eee">> => #{
-                type => string,
-                format => url,
-                required => true
-             }},
-    ?assertThrow({invalid_data_type, {string, <<"abc">>}},
-        emqx_rule_validator:validate_params(Params, Specs)),
-    ?assertEqual(Params1,
-        emqx_rule_validator:validate_params(Params1, Specs)),
-    ?assertEqual(Params2,
-        emqx_rule_validator:validate_params(Params2, Specs)).
-
-t_validate_params_fill_default(_) ->
-    Params = #{<<"abc">> => 1},
-    Specs = #{<<"eee">> => #{
-                type => string,
-                required => false,
-                default => <<"hello">>
-             }},
-    ?assertMatch(#{<<"abc">> := 1, <<"eee">> := <<"hello">>},
-        emqx_rule_validator:validate_params(Params, Specs)).
-
-t_validate_params_the_complex(_) ->
-    Params = #{
-        <<"string_required">> => <<"hello">>,
-        <<"type_number">> => 1,
-        <<"type_boolean">> => true,
-        <<"type_enum_number">> => 2,
-        <<"type_file">> => <<"">>,
-        <<"type_object">> => #{
-            <<"string_required">> => <<"hello2">>,
-            <<"type_number">> => 1.3
-        },
-        <<"type_array">> => [<<"ok">>, <<"no">>]
-    },
-    ?assertMatch(
-        #{  <<"string_required">> := <<"hello">>,
-            <<"string_optional_with_default">> := <<"a/b">>,
-            <<"type_number">> := 1,
-            <<"type_boolean">> := true,
-            <<"type_enum_number">> := 2,
-            <<"type_file">> := <<"">>,
-            <<"type_object">> := #{
-                <<"string_required">> := <<"hello2">>,
-                <<"type_number">> := 1.3
-            },
-            <<"type_array">> := [<<"ok">>, <<"no">>]
-        },
-        emqx_rule_validator:validate_params(Params, ?VALID_SPEC)).