emqx_rule_engine_schema.erl 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_engine_schema).
  17. -include_lib("typerefl/include/types.hrl").
  18. -include_lib("hocon/include/hoconsc.hrl").
  19. -behaviour(hocon_schema).
  20. -export([
  21. namespace/0,
  22. roots/0,
  23. fields/1,
  24. desc/1,
  25. post_config_update/5
  26. ]).
  27. -export([validate_sql/1]).
  28. namespace() -> rule_engine.
  29. roots() -> ["rule_engine"].
  30. fields("rule_engine") ->
  31. [
  32. {ignore_sys_message,
  33. ?HOCON(boolean(), #{default => true, desc => ?DESC("rule_engine_ignore_sys_message")})},
  34. {rules,
  35. ?HOCON(hoconsc:map("id", ?R_REF("rules")), #{
  36. desc => ?DESC("rule_engine_rules"), default => #{}
  37. })},
  38. {jq_function_default_timeout,
  39. ?HOCON(
  40. emqx_schema:duration_ms(),
  41. #{
  42. default => "10s",
  43. desc => ?DESC("rule_engine_jq_function_default_timeout")
  44. }
  45. )},
  46. {jq_implementation_module,
  47. ?HOCON(
  48. hoconsc:enum([jq_nif, jq_port]),
  49. #{
  50. default => jq_nif,
  51. mapping => "jq.jq_implementation_module",
  52. desc => ?DESC("rule_engine_jq_implementation_module")
  53. }
  54. )}
  55. ];
  56. fields("rules") ->
  57. [
  58. rule_name(),
  59. {"sql",
  60. ?HOCON(
  61. binary(),
  62. #{
  63. desc => ?DESC("rules_sql"),
  64. example => "SELECT * FROM \"test/topic\" WHERE payload.x = 1",
  65. required => true,
  66. validator => fun ?MODULE:validate_sql/1
  67. }
  68. )},
  69. {"actions",
  70. ?HOCON(
  71. ?ARRAY(?UNION(actions())),
  72. #{
  73. desc => ?DESC("rules_actions"),
  74. default => [],
  75. example => [
  76. <<"webhook:my_webhook">>,
  77. #{
  78. function => republish,
  79. args => #{
  80. topic => <<"t/1">>, payload => <<"${payload}">>
  81. }
  82. },
  83. #{function => console}
  84. ]
  85. }
  86. )},
  87. {"enable", ?HOCON(boolean(), #{desc => ?DESC("rules_enable"), default => true})},
  88. {"description",
  89. ?HOCON(
  90. binary(),
  91. #{
  92. desc => ?DESC("rules_description"),
  93. example => "Some description",
  94. default => <<>>
  95. }
  96. )},
  97. {"metadata", ?HOCON(map(), #{desc => ?DESC("rules_metadata")})}
  98. ];
  99. fields("builtin_action_republish") ->
  100. [
  101. {function, ?HOCON(republish, #{desc => ?DESC("republish_function")})},
  102. {args, ?HOCON(?R_REF("republish_args"), #{default => #{}})}
  103. ];
  104. fields("builtin_action_console") ->
  105. [
  106. {function, ?HOCON(console, #{desc => ?DESC("console_function")})}
  107. %% we may support some args for the console action in the future
  108. %, {args, sc(map(), #{desc => "The arguments of the built-in 'console' action",
  109. % default => #{}})}
  110. ];
  111. fields("user_provided_function") ->
  112. [
  113. {function,
  114. ?HOCON(
  115. binary(),
  116. #{
  117. desc => ?DESC("user_provided_function_function"),
  118. required => true,
  119. example => "module:function"
  120. }
  121. )},
  122. {args,
  123. ?HOCON(
  124. map(),
  125. #{
  126. desc => ?DESC("user_provided_function_args"),
  127. default => #{}
  128. }
  129. )}
  130. ];
  131. fields("republish_args") ->
  132. [
  133. {topic,
  134. ?HOCON(
  135. binary(),
  136. #{
  137. desc => ?DESC("republish_args_topic"),
  138. required => true,
  139. example => <<"a/1">>
  140. }
  141. )},
  142. {qos,
  143. ?HOCON(
  144. qos(),
  145. #{
  146. desc => ?DESC("republish_args_qos"),
  147. default => <<"${qos}">>,
  148. example => <<"${qos}">>
  149. }
  150. )},
  151. {retain,
  152. ?HOCON(
  153. hoconsc:union([boolean(), binary()]),
  154. #{
  155. desc => ?DESC("republish_args_retain"),
  156. default => <<"${retain}">>,
  157. example => <<"${retain}">>
  158. }
  159. )},
  160. {payload,
  161. ?HOCON(
  162. binary(),
  163. #{
  164. desc => ?DESC("republish_args_payload"),
  165. default => <<"${payload}">>,
  166. example => <<"${payload}">>
  167. }
  168. )},
  169. {user_properties,
  170. ?HOCON(
  171. binary(),
  172. #{
  173. desc => ?DESC("republish_args_user_properties"),
  174. default => <<"${user_properties}">>,
  175. example => <<"${pub_props.'User-Property'}">>
  176. }
  177. )}
  178. ].
  179. desc("rule_engine") ->
  180. ?DESC("desc_rule_engine");
  181. desc("rules") ->
  182. ?DESC("desc_rules");
  183. desc("builtin_action_republish") ->
  184. ?DESC("desc_builtin_action_republish");
  185. desc("builtin_action_console") ->
  186. ?DESC("desc_builtin_action_console");
  187. desc("user_provided_function") ->
  188. ?DESC("desc_user_provided_function");
  189. desc("republish_args") ->
  190. ?DESC("desc_republish_args");
  191. desc(_) ->
  192. undefined.
  193. rule_name() ->
  194. {"name",
  195. ?HOCON(
  196. binary(),
  197. #{
  198. desc => ?DESC("rules_name"),
  199. default => <<"">>,
  200. required => false,
  201. example => "foo"
  202. }
  203. )}.
  204. actions() ->
  205. [
  206. binary(),
  207. ?R_REF("builtin_action_republish"),
  208. ?R_REF("builtin_action_console"),
  209. ?R_REF("user_provided_function")
  210. ].
  211. qos() ->
  212. ?UNION([emqx_schema:qos(), binary()]).
  213. validate_sql(Sql) ->
  214. case emqx_rule_sqlparser:parse(Sql) of
  215. {ok, _Result} -> ok;
  216. {error, Reason} -> {error, Reason}
  217. end.
  218. post_config_update(
  219. [rule_engine, jq_implementation_module],
  220. _Req,
  221. NewSysConf,
  222. _OldSysConf,
  223. _AppEnvs
  224. ) ->
  225. jq:set_implementation_module(NewSysConf),
  226. ok.