| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -define(APP, emqx_rule_engine).
- -define(KV_TAB, '@rule_engine_db').
- -type(maybe(T) :: T | undefined).
- -type(rule_id() :: binary()).
- -type(rule_name() :: binary()).
- -type(resource_id() :: binary()).
- -type(action_instance_id() :: binary()).
- -type(action_name() :: atom()).
- -type(resource_type_name() :: atom()).
- -type(category() :: data_persist| data_forward | offline_msgs | debug | other).
- -type(descr() :: #{en := binary(), zh => binary()}).
- -type(mf() :: {Module::atom(), Fun::atom()}).
- -type(hook() :: atom() | 'any').
- -type(topic() :: binary()).
- -type(resource_status() :: #{ alive := boolean()
- , atom() => binary() | atom() | list(binary()|atom())
- }).
- -define(descr, #{en => <<>>, zh => <<>>}).
- -record(action,
- { name :: action_name()
- , category :: category()
- , for :: hook()
- , app :: atom()
- , types = [] :: list(resource_type_name())
- , module :: module()
- , on_create :: mf()
- , on_destroy :: maybe(mf())
- , hidden = false :: boolean()
- , params_spec :: #{atom() => term()} %% params specs
- , title = ?descr :: descr()
- , description = ?descr :: descr()
- }).
- -record(action_instance,
- { id :: action_instance_id()
- , name :: action_name()
- , fallbacks :: list(#action_instance{})
- , args :: #{binary() => term()} %% the args got from API for initializing action_instance
- }).
- -record(rule,
- { id :: rule_id()
- , for :: list(topic())
- , rawsql :: binary()
- , is_foreach :: boolean()
- , fields :: list()
- , doeach :: term()
- , incase :: list()
- , conditions :: tuple()
- , on_action_failed :: continue | stop
- , actions :: list(#action_instance{})
- , enabled :: boolean()
- , created_at :: integer() %% epoch in millisecond precision
- , description :: binary()
- , state = normal :: atom()
- }).
- -record(resource,
- { id :: resource_id()
- , type :: resource_type_name()
- , config :: #{} %% the configs got from API for initializing resource
- , created_at :: integer() | undefined %% epoch in millisecond precision
- , description :: binary()
- }).
- -record(resource_type,
- { name :: resource_type_name()
- , provider :: atom()
- , params_spec :: #{atom() => term()} %% params specs
- , on_create :: mf()
- , on_status :: mf()
- , on_destroy :: mf()
- , title = ?descr :: descr()
- , description = ?descr :: descr()
- }).
- -record(rule_hooks,
- { hook :: atom()
- , rule_id :: rule_id()
- }).
- -record(resource_params,
- { id :: resource_id()
- , params :: #{} %% the params got after initializing the resource
- , status = #{is_alive => false} :: #{is_alive := boolean(), atom() => term()}
- }).
- -record(action_instance_params,
- { id :: action_instance_id()
- %% the params got after initializing the action
- , params :: #{}
- %% the Func/Bindings got after initializing the action
- , apply :: fun((Data::map(), Envs::map()) -> any())
- | #{mod := module(), bindings := #{atom() => term()}}
- }).
- %% Arithmetic operators
- -define(is_arith(Op), (Op =:= '+' orelse
- Op =:= '-' orelse
- Op =:= '*' orelse
- Op =:= '/' orelse
- Op =:= 'div')).
- %% Compare operators
- -define(is_comp(Op), (Op =:= '=' orelse
- Op =:= '=~' orelse
- Op =:= '>' orelse
- Op =:= '<' orelse
- Op =:= '<=' orelse
- Op =:= '>=' orelse
- Op =:= '<>' orelse
- Op =:= '!=')).
- %% Logical operators
- -define(is_logical(Op), (Op =:= 'and' orelse Op =:= 'or')).
- -define(RAISE(_EXP_, _ERROR_),
- ?RAISE(_EXP_, _ = do_nothing, _ERROR_)).
- -define(RAISE(_EXP_, _EXP_ON_FAIL_, _ERROR_),
- fun() ->
- try (_EXP_)
- catch _EXCLASS_:_EXCPTION_:_ST_ ->
- _EXP_ON_FAIL_,
- throw(_ERROR_)
- end
- end()).
- -define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
- -define(CLUSTER_CALL(Func, Args, ResParttern),
- fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of
- {ResL, []} ->
- case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
- [] -> ResL;
- ErrL ->
- ?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
- throw({Func, ErrL})
- end;
- {ResL, BadNodes} ->
- ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
- throw({Func, {failed_on_nodes, BadNodes}})
- end end()).
- %% Tables
- -define(RULE_TAB, emqx_rule).
- -define(ACTION_TAB, emqx_rule_action).
- -define(ACTION_INST_PARAMS_TAB, emqx_action_instance_params).
- -define(RES_TAB, emqx_resource).
- -define(RES_PARAMS_TAB, emqx_resource_params).
- -define(RULE_HOOKS, emqx_rule_hooks).
- -define(RES_TYPE_TAB, emqx_resource_type).
|