rule_engine.hrl 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -define(APP, emqx_rule_engine).
  17. -define(KV_TAB, '@rule_engine_db').
  18. -type(maybe(T) :: T | undefined).
  19. -type(rule_id() :: binary()).
  20. -type(rule_name() :: binary()).
  21. -type(resource_id() :: binary()).
  22. -type(action_instance_id() :: binary()).
  23. -type(action_name() :: atom()).
  24. -type(resource_type_name() :: atom()).
  25. -type(category() :: data_persist| data_forward | offline_msgs | debug | other).
  26. -type(descr() :: #{en := binary(), zh => binary()}).
  27. -type(mf() :: {Module::atom(), Fun::atom()}).
  28. -type(hook() :: atom() | 'any').
  29. -type(topic() :: binary()).
  30. -type(resource_status() :: #{ alive := boolean()
  31. , atom() => binary() | atom() | list(binary()|atom())
  32. }).
  33. -define(descr, #{en => <<>>, zh => <<>>}).
  34. -record(action,
  35. { name :: action_name()
  36. , category :: category()
  37. , for :: hook()
  38. , app :: atom()
  39. , types = [] :: list(resource_type_name())
  40. , module :: module()
  41. , on_create :: mf()
  42. , on_destroy :: maybe(mf())
  43. , hidden = false :: boolean()
  44. , params_spec :: #{atom() => term()} %% params specs
  45. , title = ?descr :: descr()
  46. , description = ?descr :: descr()
  47. }).
  48. -record(action_instance,
  49. { id :: action_instance_id()
  50. , name :: action_name()
  51. , fallbacks :: list(#action_instance{})
  52. , args :: #{binary() => term()} %% the args got from API for initializing action_instance
  53. }).
  54. -record(rule,
  55. { id :: rule_id()
  56. , for :: list(topic())
  57. , rawsql :: binary()
  58. , is_foreach :: boolean()
  59. , fields :: list()
  60. , doeach :: term()
  61. , incase :: list()
  62. , conditions :: tuple()
  63. , on_action_failed :: continue | stop
  64. , actions :: list(#action_instance{})
  65. , enabled :: boolean()
  66. , created_at :: integer() %% epoch in millisecond precision
  67. , description :: binary()
  68. , state = normal :: atom()
  69. }).
  70. -record(resource,
  71. { id :: resource_id()
  72. , type :: resource_type_name()
  73. , config :: #{} %% the configs got from API for initializing resource
  74. , created_at :: integer() | undefined %% epoch in millisecond precision
  75. , description :: binary()
  76. }).
  77. -record(resource_type,
  78. { name :: resource_type_name()
  79. , provider :: atom()
  80. , params_spec :: #{atom() => term()} %% params specs
  81. , on_create :: mf()
  82. , on_status :: mf()
  83. , on_destroy :: mf()
  84. , title = ?descr :: descr()
  85. , description = ?descr :: descr()
  86. }).
  87. -record(rule_hooks,
  88. { hook :: atom()
  89. , rule_id :: rule_id()
  90. }).
  91. -record(resource_params,
  92. { id :: resource_id()
  93. , params :: #{} %% the params got after initializing the resource
  94. , status = #{is_alive => false} :: #{is_alive := boolean(), atom() => term()}
  95. }).
  96. -record(action_instance_params,
  97. { id :: action_instance_id()
  98. %% the params got after initializing the action
  99. , params :: #{}
  100. %% the Func/Bindings got after initializing the action
  101. , apply :: fun((Data::map(), Envs::map()) -> any())
  102. | #{mod := module(), bindings := #{atom() => term()}}
  103. }).
  104. %% Arithmetic operators
  105. -define(is_arith(Op), (Op =:= '+' orelse
  106. Op =:= '-' orelse
  107. Op =:= '*' orelse
  108. Op =:= '/' orelse
  109. Op =:= 'div')).
  110. %% Compare operators
  111. -define(is_comp(Op), (Op =:= '=' orelse
  112. Op =:= '=~' orelse
  113. Op =:= '>' orelse
  114. Op =:= '<' orelse
  115. Op =:= '<=' orelse
  116. Op =:= '>=' orelse
  117. Op =:= '<>' orelse
  118. Op =:= '!=')).
  119. %% Logical operators
  120. -define(is_logical(Op), (Op =:= 'and' orelse Op =:= 'or')).
  121. -define(RAISE(_EXP_, _ERROR_),
  122. ?RAISE(_EXP_, _ = do_nothing, _ERROR_)).
  123. -define(RAISE(_EXP_, _EXP_ON_FAIL_, _ERROR_),
  124. fun() ->
  125. try (_EXP_)
  126. catch _EXCLASS_:_EXCPTION_:_ST_ ->
  127. _EXP_ON_FAIL_,
  128. throw(_ERROR_)
  129. end
  130. end()).
  131. -define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
  132. -define(CLUSTER_CALL(Func, Args, ResParttern),
  133. fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of
  134. {ResL, []} ->
  135. case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
  136. [] -> ResL;
  137. ErrL ->
  138. ?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
  139. throw({Func, ErrL})
  140. end;
  141. {ResL, BadNodes} ->
  142. ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
  143. throw({Func, {failed_on_nodes, BadNodes}})
  144. end end()).
  145. %% Tables
  146. -define(RULE_TAB, emqx_rule).
  147. -define(ACTION_TAB, emqx_rule_action).
  148. -define(ACTION_INST_PARAMS_TAB, emqx_action_instance_params).
  149. -define(RES_TAB, emqx_resource).
  150. -define(RES_PARAMS_TAB, emqx_resource_params).
  151. -define(RULE_HOOKS, emqx_rule_hooks).
  152. -define(RES_TYPE_TAB, emqx_resource_type).