emqx_rule_api_schema.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_api_schema).
  17. -behaviour(hocon_schema).
  18. -include_lib("typerefl/include/types.hrl").
  19. -include_lib("hocon/include/hoconsc.hrl").
  20. -include_lib("emqx/include/logger.hrl").
  21. -include("rule_engine.hrl").
  22. -export([check_params/2]).
  23. -export([namespace/0, roots/0, fields/1]).
  24. -type tag() :: rule_creation | rule_test | rule_engine | rule_apply_test.
  25. -spec check_params(map(), tag()) -> {ok, map()} | {error, term()}.
  26. check_params(Params, Tag) ->
  27. BTag = atom_to_binary(Tag),
  28. Opts = #{atom_key => true, required => false},
  29. try hocon_tconf:check_plain(?MODULE, #{BTag => Params}, Opts, [Tag]) of
  30. #{Tag := Checked} -> {ok, Checked}
  31. catch
  32. throw:Reason ->
  33. ?SLOG(
  34. info,
  35. #{
  36. msg => "check_rule_params_failed",
  37. reason => Reason
  38. },
  39. #{tag => ?TAG}
  40. ),
  41. {error, Reason}
  42. end.
  43. %%======================================================================================
  44. %% Hocon Schema Definitions
  45. namespace() -> "rule_engine".
  46. roots() ->
  47. [
  48. {"rule_engine", sc(ref("rule_engine"), #{desc => ?DESC("root_rule_engine")})},
  49. {"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})},
  50. {"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})},
  51. {"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})},
  52. {"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})},
  53. {"rule_apply_test", sc(ref("rule_apply_test"), #{desc => ?DESC("root_apply_rule_test")})}
  54. ].
  55. fields("rule_engine") ->
  56. emqx_rule_engine_schema:rule_engine_settings();
  57. fields("rule_creation") ->
  58. emqx_rule_engine_schema:fields("rules");
  59. fields("rule_info") ->
  60. [
  61. rule_id(),
  62. {"from",
  63. sc(
  64. hoconsc:array(binary()),
  65. #{desc => ?DESC("ri_from"), example => "t/#"}
  66. )},
  67. {"created_at",
  68. sc(
  69. binary(),
  70. #{
  71. desc => ?DESC("ri_created_at"),
  72. example => "2021-12-01T15:00:43.153+08:00"
  73. }
  74. )}
  75. ] ++ fields("rule_creation");
  76. fields("rule_metrics") ->
  77. [
  78. rule_id(),
  79. {"metrics", sc(ref("metrics"), #{desc => ?DESC("ri_metrics")})},
  80. {"node_metrics",
  81. sc(
  82. hoconsc:array(ref("node_metrics")),
  83. #{desc => ?DESC("ri_node_metrics")}
  84. )}
  85. ];
  86. %% TODO: we can delete this API if the Dashboard not depends on it
  87. fields("rule_events") ->
  88. ETopics = emqx_rule_events:event_topics_enum(),
  89. [
  90. {"event", sc(hoconsc:enum(ETopics), #{desc => ?DESC("rs_event"), required => true})},
  91. {"title", sc(binary(), #{desc => ?DESC("rs_title"), example => "some title"})},
  92. {"description", sc(binary(), #{desc => ?DESC("rs_description"), example => "some desc"})},
  93. {"columns", sc(map(), #{desc => ?DESC("rs_columns")})},
  94. {"test_columns", sc(map(), #{desc => ?DESC("rs_test_columns")})},
  95. {"sql_example", sc(binary(), #{desc => ?DESC("rs_sql_example")})}
  96. ];
  97. fields("rule_test") ->
  98. [
  99. rule_input_message_context(),
  100. {"sql", sc(binary(), #{desc => ?DESC("test_sql"), required => true})}
  101. ];
  102. fields("rule_apply_test") ->
  103. [
  104. rule_input_message_context(),
  105. {"stop_action_after_template_rendering",
  106. sc(
  107. typerefl:boolean(),
  108. #{
  109. desc =>
  110. ?DESC("stop_action_after_template_render"),
  111. default => true
  112. }
  113. )}
  114. ];
  115. fields("metrics") ->
  116. [
  117. {"matched",
  118. sc(non_neg_integer(), #{
  119. desc => ?DESC("metrics_sql_matched")
  120. })},
  121. {"matched.rate", sc(float(), #{desc => ?DESC("metrics_sql_matched_rate")})},
  122. {"matched.rate.max", sc(float(), #{desc => ?DESC("metrics_sql_matched_rate_max")})},
  123. {"matched.rate.last5m",
  124. sc(
  125. float(),
  126. #{desc => ?DESC("metrics_sql_matched_rate_last5m")}
  127. )},
  128. {"passed", sc(non_neg_integer(), #{desc => ?DESC("metrics_sql_passed")})},
  129. {"failed", sc(non_neg_integer(), #{desc => ?DESC("metrics_sql_failed")})},
  130. {"failed.exception",
  131. sc(non_neg_integer(), #{
  132. desc => ?DESC("metrics_sql_failed_exception")
  133. })},
  134. {"failed.unknown",
  135. sc(non_neg_integer(), #{
  136. desc => ?DESC("metrics_sql_failed_unknown")
  137. })},
  138. {"actions.total",
  139. sc(non_neg_integer(), #{
  140. desc => ?DESC("metrics_actions_total")
  141. })},
  142. {"actions.success",
  143. sc(non_neg_integer(), #{
  144. desc => ?DESC("metrics_actions_success")
  145. })},
  146. {"actions.failed",
  147. sc(non_neg_integer(), #{
  148. desc => ?DESC("metrics_actions_failed")
  149. })},
  150. {"actions.failed.out_of_service",
  151. sc(non_neg_integer(), #{
  152. desc => ?DESC("metrics_actions_failed_out_of_service")
  153. })},
  154. {"actions.failed.unknown",
  155. sc(non_neg_integer(), #{
  156. desc => ?DESC("metrics_actions_failed_unknown")
  157. })},
  158. {"actions.discarded",
  159. sc(non_neg_integer(), #{
  160. desc => ?DESC("metrics_actions_discarded")
  161. })}
  162. ];
  163. fields("node_metrics") ->
  164. [{"node", sc(binary(), #{desc => ?DESC("node_node"), example => "emqx@127.0.0.1"})}] ++
  165. fields("metrics");
  166. fields("ctx_pub") ->
  167. Event = 'message.publish',
  168. [
  169. {"event_type", event_type_sc(Event)},
  170. {"event", event_sc(Event)},
  171. {"id", sc(binary(), #{desc => ?DESC("event_id")})}
  172. | msg_event_common_fields()
  173. ];
  174. fields("ctx_sub") ->
  175. Event = 'session.subscribed',
  176. [
  177. {"event_type", event_type_sc(Event)},
  178. {"event", event_sc(Event)}
  179. | msg_event_common_fields()
  180. ];
  181. fields("ctx_unsub") ->
  182. Event = 'session.unsubscribed',
  183. [
  184. {"event_type", event_type_sc(Event)},
  185. {"event", event_sc(Event)}
  186. | without(["event_type", "event_topic", "event"], fields("ctx_sub"))
  187. ];
  188. fields("ctx_delivered") ->
  189. Event = 'message.delivered',
  190. [
  191. {"event_type", event_type_sc(Event)},
  192. {"event", event_sc(Event)},
  193. {"id", sc(binary(), #{desc => ?DESC("event_id")})},
  194. {"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
  195. {"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
  196. | msg_event_common_fields()
  197. ];
  198. fields("ctx_acked") ->
  199. Event = 'message.acked',
  200. [
  201. {"event_type", event_type_sc(Event)},
  202. {"event", event_sc(Event)}
  203. | without(["event_type", "event_topic", "event"], fields("ctx_delivered"))
  204. ];
  205. fields("ctx_dropped") ->
  206. Event = 'message.dropped',
  207. [
  208. {"event_type", event_type_sc(Event)},
  209. {"event", event_sc(Event)},
  210. {"id", sc(binary(), #{desc => ?DESC("event_id")})},
  211. {"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})}
  212. | msg_event_common_fields()
  213. ];
  214. fields("ctx_connected") ->
  215. Event = 'client.connected',
  216. [
  217. {"event_type", event_type_sc(Event)},
  218. {"event", event_sc(Event)},
  219. {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
  220. {"username", sc(binary(), #{desc => ?DESC("event_username")})},
  221. {"mountpoint", sc(binary(), #{desc => ?DESC("event_mountpoint")})},
  222. {"peername", sc(binary(), #{desc => ?DESC("event_peername")})},
  223. {"sockname", sc(binary(), #{desc => ?DESC("event_sockname")})},
  224. {"proto_name", sc(binary(), #{desc => ?DESC("event_proto_name")})},
  225. {"proto_ver", sc(binary(), #{desc => ?DESC("event_proto_ver")})},
  226. {"keepalive", sc(integer(), #{desc => ?DESC("event_keepalive")})},
  227. {"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})},
  228. {"expiry_interval", sc(integer(), #{desc => ?DESC("event_expiry_interval")})},
  229. {"is_bridge", sc(boolean(), #{desc => ?DESC("event_is_bridge"), default => false})},
  230. {"connected_at",
  231. sc(integer(), #{
  232. desc => ?DESC("event_connected_at")
  233. })}
  234. ];
  235. fields("ctx_disconnected") ->
  236. Event = 'client.disconnected',
  237. [
  238. {"event_type", event_type_sc(Event)},
  239. {"event", event_sc(Event)},
  240. {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
  241. {"username", sc(binary(), #{desc => ?DESC("event_username")})},
  242. {"reason", sc(binary(), #{desc => ?DESC("event_ctx_disconnected_reason")})},
  243. {"peername", sc(binary(), #{desc => ?DESC("event_peername")})},
  244. {"sockname", sc(binary(), #{desc => ?DESC("event_sockname")})},
  245. {"disconnected_at",
  246. sc(integer(), #{
  247. desc => ?DESC("event_ctx_disconnected_da")
  248. })}
  249. ];
  250. fields("ctx_connack") ->
  251. Event = 'client.connack',
  252. [
  253. {"event_type", event_type_sc(Event)},
  254. {"event", event_sc(Event)},
  255. {"reason_code", sc(binary(), #{desc => ?DESC("event_ctx_connack_reason_code")})},
  256. {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
  257. {"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})},
  258. {"username", sc(binary(), #{desc => ?DESC("event_username")})},
  259. {"peername", sc(binary(), #{desc => ?DESC("event_peername")})},
  260. {"sockname", sc(binary(), #{desc => ?DESC("event_sockname")})},
  261. {"proto_name", sc(binary(), #{desc => ?DESC("event_proto_name")})},
  262. {"proto_ver", sc(binary(), #{desc => ?DESC("event_proto_ver")})},
  263. {"keepalive", sc(integer(), #{desc => ?DESC("event_keepalive")})},
  264. {"expiry_interval", sc(integer(), #{desc => ?DESC("event_expiry_interval")})},
  265. {"connected_at",
  266. sc(integer(), #{
  267. desc => ?DESC("event_connected_at")
  268. })}
  269. ];
  270. fields("ctx_check_authz_complete") ->
  271. Event = 'client.check_authz_complete',
  272. [
  273. {"event_type", event_type_sc(Event)},
  274. {"event", event_sc(Event)},
  275. {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
  276. {"username", sc(binary(), #{desc => ?DESC("event_username")})},
  277. {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
  278. {"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
  279. {"action", sc(binary(), #{desc => ?DESC("event_action")})},
  280. {"authz_source", sc(binary(), #{desc => ?DESC("event_authz_source")})},
  281. {"result", sc(binary(), #{desc => ?DESC("event_result")})}
  282. ];
  283. fields("ctx_check_authn_complete") ->
  284. Event = 'client.check_authn_complete',
  285. [
  286. {"event_type", event_type_sc(Event)},
  287. {"event", event_sc(Event)},
  288. {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
  289. {"username", sc(binary(), #{desc => ?DESC("event_username")})},
  290. {"reason_code", sc(binary(), #{desc => ?DESC("event_ctx_authn_reason_code")})},
  291. {"peername", sc(binary(), #{desc => ?DESC("event_peername")})},
  292. {"is_anonymous", sc(boolean(), #{desc => ?DESC("event_is_anonymous"), required => false})},
  293. {"is_superuser", sc(boolean(), #{desc => ?DESC("event_is_superuser"), required => false})}
  294. ];
  295. fields("ctx_bridge_mqtt") ->
  296. Event = '$bridges/mqtt:*',
  297. EventBin = atom_to_binary(Event),
  298. [
  299. {"event_type", event_type_sc(Event)},
  300. {"event", event_sc(EventBin)},
  301. {"id", sc(binary(), #{desc => ?DESC("event_id")})},
  302. {"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
  303. {"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
  304. {"server", sc(binary(), #{desc => ?DESC("event_server")})},
  305. {"dup", sc(binary(), #{desc => ?DESC("event_dup")})},
  306. {"retain", sc(binary(), #{desc => ?DESC("event_retain")})},
  307. {"message_received_at", publish_received_at_sc()},
  308. qos()
  309. ];
  310. fields("ctx_delivery_dropped") ->
  311. Event = 'delivery.dropped',
  312. [
  313. {"event_type", event_type_sc(Event)},
  314. {"event", event_sc(Event)},
  315. {"id", sc(binary(), #{desc => ?DESC("event_id")})},
  316. {"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})},
  317. {"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
  318. {"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
  319. | msg_event_common_fields()
  320. ];
  321. fields("ctx_schema_validation_failed") ->
  322. Event = 'schema.validation_failed',
  323. [
  324. {"event_type", event_type_sc(Event)},
  325. {"validation", sc(binary(), #{desc => ?DESC("event_validation")})}
  326. | msg_event_common_fields()
  327. ];
  328. fields("ctx_message_transformation_failed") ->
  329. Event = 'message.transformation_failed',
  330. [
  331. {"event_type", event_type_sc(Event)},
  332. {"transformation", sc(binary(), #{desc => ?DESC("event_transformation")})}
  333. | msg_event_common_fields()
  334. ].
  335. rule_input_message_context() ->
  336. {"context",
  337. sc(
  338. hoconsc:union([
  339. ref("ctx_pub"),
  340. ref("ctx_sub"),
  341. ref("ctx_unsub"),
  342. ref("ctx_delivered"),
  343. ref("ctx_acked"),
  344. ref("ctx_dropped"),
  345. ref("ctx_connected"),
  346. ref("ctx_disconnected"),
  347. ref("ctx_connack"),
  348. ref("ctx_check_authz_complete"),
  349. ref("ctx_check_authn_complete"),
  350. ref("ctx_bridge_mqtt"),
  351. ref("ctx_delivery_dropped"),
  352. ref("ctx_schema_validation_failed"),
  353. ref("ctx_message_transformation_failed")
  354. ]),
  355. #{
  356. desc => ?DESC("test_context"),
  357. default => #{}
  358. }
  359. )}.
  360. qos() ->
  361. {"qos", sc(emqx_schema:qos(), #{desc => ?DESC("event_qos")})}.
  362. rule_id() ->
  363. {"id",
  364. sc(
  365. binary(),
  366. #{
  367. desc => ?DESC("rule_id"),
  368. required => true,
  369. example => "293fb66f"
  370. }
  371. )}.
  372. sc(Type, Meta) -> hoconsc:mk(Type, Meta).
  373. ref(Field) -> hoconsc:ref(?MODULE, Field).
  374. event_type_sc(Event) ->
  375. EventType = event_to_event_type(Event),
  376. sc(EventType, #{desc => ?DESC("event_event_type"), required => true}).
  377. -spec event_to_event_type(atom()) -> atom().
  378. event_to_event_type(Event) ->
  379. binary_to_atom(binary:replace(atom_to_binary(Event), <<".">>, <<"_">>)).
  380. event_sc(Event) when is_binary(Event) ->
  381. %% only exception is `$bridges/...'.
  382. sc(binary(), #{default => Event, importance => ?IMPORTANCE_HIDDEN});
  383. event_sc(Event) ->
  384. sc(Event, #{default => Event, importance => ?IMPORTANCE_HIDDEN}).
  385. without(FieldNames, Fields) ->
  386. lists:foldl(fun proplists:delete/2, Fields, FieldNames).
  387. publish_received_at_sc() ->
  388. sc(integer(), #{desc => ?DESC("event_publish_received_at")}).
  389. msg_event_common_fields() ->
  390. [
  391. {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
  392. {"username", sc(binary(), #{desc => ?DESC("event_username")})},
  393. {"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
  394. {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
  395. {"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
  396. {"publish_received_at", publish_received_at_sc()},
  397. qos()
  398. ].