emqx_rule_engine_api.erl 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_engine_api).
  17. -include("rule_engine.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include_lib("hocon/include/hoconsc.hrl").
  20. -include_lib("typerefl/include/types.hrl").
  21. -behaviour(minirest_api).
  22. -import(hoconsc, [mk/2, ref/2, array/1]).
  23. -export([printable_function_name/2]).
  24. %% Swagger specs from hocon schema
  25. -export([api_spec/0, paths/0, schema/1, namespace/0]).
  26. %% API callbacks
  27. -export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2, '/rules/:id/reset_metrics'/2]).
  28. %% query callback
  29. -export([query/4]).
  30. -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))).
  31. -define(ERR_BADARGS(REASON), begin
  32. R0 = err_msg(REASON),
  33. <<"Bad Arguments: ", R0/binary>>
  34. end).
  35. -define(CHECK_PARAMS(PARAMS, TAG, EXPR),
  36. case emqx_rule_api_schema:check_params(PARAMS, TAG) of
  37. {ok, CheckedParams} ->
  38. EXPR;
  39. {error, REASON} ->
  40. {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(REASON)}}
  41. end
  42. ).
  43. -define(METRICS(
  44. MATCH,
  45. PASS,
  46. FAIL,
  47. FAIL_EX,
  48. FAIL_NORES,
  49. O_TOTAL,
  50. O_FAIL,
  51. O_FAIL_OOS,
  52. O_FAIL_UNKNOWN,
  53. O_SUCC,
  54. RATE,
  55. RATE_MAX,
  56. RATE_5
  57. ),
  58. #{
  59. 'matched' => MATCH,
  60. 'passed' => PASS,
  61. 'failed' => FAIL,
  62. 'failed.exception' => FAIL_EX,
  63. 'failed.no_result' => FAIL_NORES,
  64. 'actions.total' => O_TOTAL,
  65. 'actions.failed' => O_FAIL,
  66. 'actions.failed.out_of_service' => O_FAIL_OOS,
  67. 'actions.failed.unknown' => O_FAIL_UNKNOWN,
  68. 'actions.success' => O_SUCC,
  69. 'matched.rate' => RATE,
  70. 'matched.rate.max' => RATE_MAX,
  71. 'matched.rate.last5m' => RATE_5
  72. }
  73. ).
  74. -define(metrics(
  75. MATCH,
  76. PASS,
  77. FAIL,
  78. FAIL_EX,
  79. FAIL_NORES,
  80. O_TOTAL,
  81. O_FAIL,
  82. O_FAIL_OOS,
  83. O_FAIL_UNKNOWN,
  84. O_SUCC,
  85. RATE,
  86. RATE_MAX,
  87. RATE_5
  88. ),
  89. #{
  90. 'matched' := MATCH,
  91. 'passed' := PASS,
  92. 'failed' := FAIL,
  93. 'failed.exception' := FAIL_EX,
  94. 'failed.no_result' := FAIL_NORES,
  95. 'actions.total' := O_TOTAL,
  96. 'actions.failed' := O_FAIL,
  97. 'actions.failed.out_of_service' := O_FAIL_OOS,
  98. 'actions.failed.unknown' := O_FAIL_UNKNOWN,
  99. 'actions.success' := O_SUCC,
  100. 'matched.rate' := RATE,
  101. 'matched.rate.max' := RATE_MAX,
  102. 'matched.rate.last5m' := RATE_5
  103. }
  104. ).
  105. -define(RULE_QS_SCHEMA, [
  106. {<<"enable">>, atom},
  107. {<<"from">>, binary},
  108. {<<"like_id">>, binary},
  109. {<<"like_from">>, binary},
  110. {<<"match_from">>, binary},
  111. {<<"like_description">>, binary}
  112. ]).
  113. namespace() -> "rule".
  114. api_spec() ->
  115. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
  116. paths() -> ["/rule_events", "/rule_test", "/rules", "/rules/:id", "/rules/:id/reset_metrics"].
  117. error_schema(Code, Message) when is_atom(Code) ->
  118. emqx_dashboard_swagger:error_codes([Code], list_to_binary(Message)).
  119. rule_creation_schema() ->
  120. ref(emqx_rule_api_schema, "rule_creation").
  121. rule_test_schema() ->
  122. ref(emqx_rule_api_schema, "rule_test").
  123. rule_info_schema() ->
  124. ref(emqx_rule_api_schema, "rule_info").
  125. schema("/rules") ->
  126. #{
  127. 'operationId' => '/rules',
  128. get => #{
  129. tags => [<<"rules">>],
  130. description => ?DESC("api1"),
  131. parameters => [
  132. {enable,
  133. mk(boolean(), #{desc => ?DESC("api1_enable"), in => query, required => false})},
  134. {from, mk(binary(), #{desc => ?DESC("api1_from"), in => query, required => false})},
  135. {like_id,
  136. mk(binary(), #{desc => ?DESC("api1_like_id"), in => query, required => false})},
  137. {like_from,
  138. mk(binary(), #{desc => ?DESC("api1_like_from"), in => query, required => false})},
  139. {like_description,
  140. mk(binary(), #{
  141. desc => ?DESC("api1_like_description"), in => query, required => false
  142. })},
  143. {match_from,
  144. mk(binary(), #{desc => ?DESC("api1_match_from"), in => query, required => false})},
  145. ref(emqx_dashboard_swagger, page),
  146. ref(emqx_dashboard_swagger, limit)
  147. ],
  148. summary => <<"List Rules">>,
  149. responses => #{
  150. 200 =>
  151. [
  152. {data, mk(array(rule_info_schema()), #{desc => ?DESC("desc9")})},
  153. {meta, mk(ref(emqx_dashboard_swagger, meta), #{})}
  154. ],
  155. 400 => error_schema('BAD_REQUEST', "Invalid Parameters")
  156. }
  157. },
  158. post => #{
  159. tags => [<<"rules">>],
  160. description => ?DESC("api2"),
  161. summary => <<"Create a Rule">>,
  162. 'requestBody' => rule_creation_schema(),
  163. responses => #{
  164. 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
  165. 201 => rule_info_schema()
  166. }
  167. }
  168. };
  169. schema("/rule_events") ->
  170. #{
  171. 'operationId' => '/rule_events',
  172. get => #{
  173. tags => [<<"rules">>],
  174. description => ?DESC("api3"),
  175. summary => <<"List Events">>,
  176. responses => #{
  177. 200 => mk(ref(emqx_rule_api_schema, "rule_events"), #{})
  178. }
  179. }
  180. };
  181. schema("/rules/:id") ->
  182. #{
  183. 'operationId' => '/rules/:id',
  184. get => #{
  185. tags => [<<"rules">>],
  186. description => ?DESC("api4"),
  187. summary => <<"Get a Rule">>,
  188. parameters => param_path_id(),
  189. responses => #{
  190. 404 => error_schema('NOT_FOUND', "Rule not found"),
  191. 200 => rule_info_schema()
  192. }
  193. },
  194. put => #{
  195. tags => [<<"rules">>],
  196. description => ?DESC("api5"),
  197. summary => <<"Update a Rule">>,
  198. parameters => param_path_id(),
  199. 'requestBody' => rule_creation_schema(),
  200. responses => #{
  201. 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
  202. 200 => rule_info_schema()
  203. }
  204. },
  205. delete => #{
  206. tags => [<<"rules">>],
  207. description => ?DESC("api6"),
  208. summary => <<"Delete a Rule">>,
  209. parameters => param_path_id(),
  210. responses => #{
  211. 204 => <<"Delete rule successfully">>
  212. }
  213. }
  214. };
  215. schema("/rules/:id/reset_metrics") ->
  216. #{
  217. 'operationId' => '/rules/:id/reset_metrics',
  218. put => #{
  219. tags => [<<"rules">>],
  220. description => ?DESC("api7"),
  221. summary => <<"Reset a Rule Metrics">>,
  222. parameters => param_path_id(),
  223. responses => #{
  224. 400 => error_schema('BAD_REQUEST', "RPC Call Failed"),
  225. 200 => <<"Reset Success">>
  226. }
  227. }
  228. };
  229. schema("/rule_test") ->
  230. #{
  231. 'operationId' => '/rule_test',
  232. post => #{
  233. tags => [<<"rules">>],
  234. description => ?DESC("api8"),
  235. summary => <<"Test a Rule">>,
  236. 'requestBody' => rule_test_schema(),
  237. responses => #{
  238. 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
  239. 412 => error_schema('NOT_MATCH', "SQL Not Match"),
  240. 200 => <<"Rule Test Pass">>
  241. }
  242. }
  243. }.
  244. param_path_id() ->
  245. [{id, mk(binary(), #{in => path, example => <<"my_rule_id">>})}].
  246. %%------------------------------------------------------------------------------
  247. %% Rules API
  248. %%------------------------------------------------------------------------------
  249. '/rule_events'(get, _Params) ->
  250. {200, emqx_rule_events:event_info()}.
  251. '/rules'(get, #{query_string := QueryString}) ->
  252. case
  253. emqx_mgmt_api:node_query(
  254. node(),
  255. QueryString,
  256. ?RULE_TAB,
  257. ?RULE_QS_SCHEMA,
  258. {?MODULE, query}
  259. )
  260. of
  261. {error, page_limit_invalid} ->
  262. {400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
  263. Result ->
  264. {200, Result}
  265. end;
  266. '/rules'(post, #{body := Params0}) ->
  267. case maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))) of
  268. <<>> ->
  269. {400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
  270. Id ->
  271. Params = filter_out_request_body(add_metadata(Params0)),
  272. ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
  273. case emqx_rule_engine:get_rule(Id) of
  274. {ok, _Rule} ->
  275. {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}};
  276. not_found ->
  277. case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
  278. {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
  279. [Rule] = get_one_rule(AllRules, Id),
  280. {201, format_rule_resp(Rule)};
  281. {error, Reason} ->
  282. ?SLOG(error, #{
  283. msg => "create_rule_failed",
  284. id => Id,
  285. reason => Reason
  286. }),
  287. {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
  288. end
  289. end
  290. end.
  291. '/rule_test'(post, #{body := Params}) ->
  292. ?CHECK_PARAMS(
  293. Params,
  294. rule_test,
  295. case emqx_rule_sqltester:test(CheckedParams) of
  296. {ok, Result} ->
  297. {200, Result};
  298. {error, {parse_error, Reason}} ->
  299. {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
  300. {error, nomatch} ->
  301. {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}};
  302. {error, Reason} ->
  303. {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
  304. end
  305. ).
  306. '/rules/:id'(get, #{bindings := #{id := Id}}) ->
  307. case emqx_rule_engine:get_rule(Id) of
  308. {ok, Rule} ->
  309. {200, format_rule_resp(Rule)};
  310. not_found ->
  311. {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
  312. end;
  313. '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
  314. Params = filter_out_request_body(Params0),
  315. ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
  316. case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of
  317. {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
  318. [Rule] = get_one_rule(AllRules, Id),
  319. {200, format_rule_resp(Rule)};
  320. {error, Reason} ->
  321. ?SLOG(error, #{
  322. msg => "update_rule_failed",
  323. id => Id,
  324. reason => Reason
  325. }),
  326. {400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(Reason)}}
  327. end;
  328. '/rules/:id'(delete, #{bindings := #{id := Id}}) ->
  329. ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
  330. case emqx_conf:remove(ConfPath, #{override_to => cluster}) of
  331. {ok, _} ->
  332. {204};
  333. {error, Reason} ->
  334. ?SLOG(error, #{
  335. msg => "delete_rule_failed",
  336. id => Id,
  337. reason => Reason
  338. }),
  339. {500, #{code => 'INTERNAL_ERROR', message => ?ERR_BADARGS(Reason)}}
  340. end.
  341. '/rules/:id/reset_metrics'(put, #{bindings := #{id := RuleId}}) ->
  342. case emqx_rule_engine_proto_v1:reset_metrics(RuleId) of
  343. ok ->
  344. {200, <<"Reset Success">>};
  345. Failed ->
  346. {400, #{
  347. code => 'BAD_REQUEST',
  348. message => err_msg(Failed)
  349. }}
  350. end.
  351. %%------------------------------------------------------------------------------
  352. %% Internal functions
  353. %%------------------------------------------------------------------------------
  354. err_msg(Msg) -> emqx_misc:readable_error_msg(Msg).
  355. format_rule_resp(Rules) when is_list(Rules) ->
  356. [format_rule_resp(R) || R <- Rules];
  357. format_rule_resp({Id, Rule}) ->
  358. format_rule_resp(Rule#{id => Id});
  359. format_rule_resp(#{
  360. id := Id,
  361. name := Name,
  362. created_at := CreatedAt,
  363. from := Topics,
  364. actions := Action,
  365. sql := SQL,
  366. enable := Enable,
  367. description := Descr
  368. }) ->
  369. NodeMetrics = get_rule_metrics(Id),
  370. #{
  371. id => Id,
  372. name => Name,
  373. from => Topics,
  374. actions => format_action(Action),
  375. sql => SQL,
  376. metrics => aggregate_metrics(NodeMetrics),
  377. node_metrics => NodeMetrics,
  378. enable => Enable,
  379. created_at => format_datetime(CreatedAt, millisecond),
  380. description => Descr
  381. }.
  382. format_datetime(Timestamp, Unit) ->
  383. list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).
  384. format_action(Actions) ->
  385. [do_format_action(Act) || Act <- Actions].
  386. do_format_action(#{mod := Mod, func := Func, args := Args}) ->
  387. #{
  388. function => printable_function_name(Mod, Func),
  389. args => maps:remove(preprocessed_tmpl, Args)
  390. };
  391. do_format_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
  392. BridgeChannelId.
  393. printable_function_name(emqx_rule_actions, Func) ->
  394. Func;
  395. printable_function_name(Mod, Func) ->
  396. list_to_binary(lists:concat([Mod, ":", Func])).
  397. get_rule_metrics(Id) ->
  398. Format = fun(
  399. Node,
  400. #{
  401. counters :=
  402. #{
  403. 'matched' := Matched,
  404. 'passed' := Passed,
  405. 'failed' := Failed,
  406. 'failed.exception' := FailedEx,
  407. 'failed.no_result' := FailedNoRes,
  408. 'actions.total' := OTotal,
  409. 'actions.failed' := OFailed,
  410. 'actions.failed.out_of_service' := OFailedOOS,
  411. 'actions.failed.unknown' := OFailedUnknown,
  412. 'actions.success' := OFailedSucc
  413. },
  414. rate :=
  415. #{
  416. 'matched' :=
  417. #{current := Current, max := Max, last5m := Last5M}
  418. }
  419. }
  420. ) ->
  421. #{
  422. metrics => ?METRICS(
  423. Matched,
  424. Passed,
  425. Failed,
  426. FailedEx,
  427. FailedNoRes,
  428. OTotal,
  429. OFailed,
  430. OFailedOOS,
  431. OFailedUnknown,
  432. OFailedSucc,
  433. Current,
  434. Max,
  435. Last5M
  436. ),
  437. node => Node
  438. }
  439. end,
  440. [
  441. Format(Node, emqx_plugin_libs_proto_v1:get_metrics(Node, rule_metrics, Id))
  442. || Node <- mria_mnesia:running_nodes()
  443. ].
  444. aggregate_metrics(AllMetrics) ->
  445. InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
  446. lists:foldl(
  447. fun(
  448. #{
  449. metrics := ?metrics(
  450. Match1,
  451. Passed1,
  452. Failed1,
  453. FailedEx1,
  454. FailedNoRes1,
  455. OTotal1,
  456. OFailed1,
  457. OFailedOOS1,
  458. OFailedUnknown1,
  459. OFailedSucc1,
  460. Rate1,
  461. RateMax1,
  462. Rate5m1
  463. )
  464. },
  465. ?metrics(
  466. Match0,
  467. Passed0,
  468. Failed0,
  469. FailedEx0,
  470. FailedNoRes0,
  471. OTotal0,
  472. OFailed0,
  473. OFailedOOS0,
  474. OFailedUnknown0,
  475. OFailedSucc0,
  476. Rate0,
  477. RateMax0,
  478. Rate5m0
  479. )
  480. ) ->
  481. ?METRICS(
  482. Match1 + Match0,
  483. Passed1 + Passed0,
  484. Failed1 + Failed0,
  485. FailedEx1 + FailedEx0,
  486. FailedNoRes1 + FailedNoRes0,
  487. OTotal1 + OTotal0,
  488. OFailed1 + OFailed0,
  489. OFailedOOS1 + OFailedOOS0,
  490. OFailedUnknown1 + OFailedUnknown0,
  491. OFailedSucc1 + OFailedSucc0,
  492. Rate1 + Rate0,
  493. RateMax1 + RateMax0,
  494. Rate5m1 + Rate5m0
  495. )
  496. end,
  497. InitMetrics,
  498. AllMetrics
  499. ).
  500. get_one_rule(AllRules, Id) ->
  501. [R || R = #{id := Id0} <- AllRules, Id0 == Id].
  502. add_metadata(Params) ->
  503. Params#{
  504. <<"metadata">> => #{
  505. <<"created_at">> => emqx_rule_engine:now_ms()
  506. }
  507. }.
  508. filter_out_request_body(Conf) ->
  509. ExtraConfs = [
  510. <<"id">>,
  511. <<"status">>,
  512. <<"node_status">>,
  513. <<"node_metrics">>,
  514. <<"metrics">>,
  515. <<"node">>
  516. ],
  517. maps:without(ExtraConfs, Conf).
  518. query(Tab, {Qs, Fuzzy}, Start, Limit) ->
  519. Ms = qs2ms(),
  520. FuzzyFun = fuzzy_match_fun(Qs, Ms, Fuzzy),
  521. emqx_mgmt_api:select_table_with_count(
  522. Tab, {Ms, FuzzyFun}, Start, Limit, fun format_rule_resp/1
  523. ).
  524. %% rule is not a record, so everything is fuzzy filter.
  525. qs2ms() ->
  526. [{'_', [], ['$_']}].
  527. fuzzy_match_fun(Qs, Ms, Fuzzy) ->
  528. MsC = ets:match_spec_compile(Ms),
  529. fun(Rows) ->
  530. Ls = ets:match_spec_run(Rows, MsC),
  531. lists:filter(
  532. fun(E) ->
  533. run_qs_match(E, Qs) andalso
  534. run_fuzzy_match(E, Fuzzy)
  535. end,
  536. Ls
  537. )
  538. end.
  539. run_qs_match(_, []) ->
  540. true;
  541. run_qs_match(E = {_Id, #{enable := Enable}}, [{enable, '=:=', Pattern} | Qs]) ->
  542. Enable =:= Pattern andalso run_qs_match(E, Qs);
  543. run_qs_match(E = {_Id, #{from := From}}, [{from, '=:=', Pattern} | Qs]) ->
  544. lists:member(Pattern, From) andalso run_qs_match(E, Qs);
  545. run_qs_match(E, [_ | Qs]) ->
  546. run_qs_match(E, Qs).
  547. run_fuzzy_match(_, []) ->
  548. true;
  549. run_fuzzy_match(E = {Id, _}, [{id, like, Pattern} | Fuzzy]) ->
  550. binary:match(Id, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
  551. run_fuzzy_match(E = {_Id, #{description := Desc}}, [{description, like, Pattern} | Fuzzy]) ->
  552. binary:match(Desc, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
  553. run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, match, Pattern} | Fuzzy]) ->
  554. lists:any(fun(For) -> emqx_topic:match(For, Pattern) end, Topics) andalso
  555. run_fuzzy_match(E, Fuzzy);
  556. run_fuzzy_match(E = {_Id, #{from := Topics}}, [{from, like, Pattern} | Fuzzy]) ->
  557. lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics) andalso
  558. run_fuzzy_match(E, Fuzzy);
  559. run_fuzzy_match(E, [_ | Fuzzy]) ->
  560. run_fuzzy_match(E, Fuzzy).