emqx_rule_engine_api.erl 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_engine_api).
  17. -include("rule_engine.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -rest_api(#{name => create_rule,
  20. method => 'POST',
  21. path => "/rules/",
  22. func => create_rule,
  23. descr => "Create a rule"
  24. }).
  25. -rest_api(#{name => update_rule,
  26. method => 'PUT',
  27. path => "/rules/:bin:id",
  28. func => update_rule,
  29. descr => "Update a rule"
  30. }).
  31. -rest_api(#{name => list_rules,
  32. method => 'GET',
  33. path => "/rules/",
  34. func => list_rules,
  35. descr => "A list of all rules"
  36. }).
  37. -rest_api(#{name => show_rule,
  38. method => 'GET',
  39. path => "/rules/:bin:id",
  40. func => show_rule,
  41. descr => "Show a rule"
  42. }).
  43. -rest_api(#{name => delete_rule,
  44. method => 'DELETE',
  45. path => "/rules/:bin:id",
  46. func => delete_rule,
  47. descr => "Delete a rule"
  48. }).
  49. -rest_api(#{name => list_actions,
  50. method => 'GET',
  51. path => "/actions/",
  52. func => list_actions,
  53. descr => "A list of all actions"
  54. }).
  55. -rest_api(#{name => show_action,
  56. method => 'GET',
  57. path => "/actions/:atom:name",
  58. func => show_action,
  59. descr => "Show an action"
  60. }).
  61. -rest_api(#{name => list_resources,
  62. method => 'GET',
  63. path => "/resources/",
  64. func => list_resources,
  65. descr => "A list of all resources"
  66. }).
  67. -rest_api(#{name => create_resource,
  68. method => 'POST',
  69. path => "/resources/",
  70. func => create_resource,
  71. descr => "Create a resource"
  72. }).
  73. -rest_api(#{name => update_resource,
  74. method => 'PUT',
  75. path => "/resources/:bin:id",
  76. func => update_resource,
  77. descr => "Update a resource"
  78. }).
  79. -rest_api(#{name => show_resource,
  80. method => 'GET',
  81. path => "/resources/:bin:id",
  82. func => show_resource,
  83. descr => "Show a resource"
  84. }).
  85. -rest_api(#{name => get_resource_status,
  86. method => 'GET',
  87. path => "/resource_status/:bin:id",
  88. func => get_resource_status,
  89. descr => "Get status of a resource"
  90. }).
  91. -rest_api(#{name => start_resource,
  92. method => 'POST',
  93. path => "/resources/:bin:id",
  94. func => start_resource,
  95. descr => "Start a resource"
  96. }).
  97. -rest_api(#{name => delete_resource,
  98. method => 'DELETE',
  99. path => "/resources/:bin:id",
  100. func => delete_resource,
  101. descr => "Delete a resource"
  102. }).
  103. -rest_api(#{name => list_resource_types,
  104. method => 'GET',
  105. path => "/resource_types/",
  106. func => list_resource_types,
  107. descr => "List all resource types"
  108. }).
  109. -rest_api(#{name => show_resource_type,
  110. method => 'GET',
  111. path => "/resource_types/:atom:name",
  112. func => show_resource_type,
  113. descr => "Show a resource type"
  114. }).
  115. -rest_api(#{name => list_resources_by_type,
  116. method => 'GET',
  117. path => "/resource_types/:atom:type/resources",
  118. func => list_resources_by_type,
  119. descr => "List all resources of a resource type"
  120. }).
  121. -rest_api(#{name => list_events,
  122. method => 'GET',
  123. path => "/rule_events/",
  124. func => list_events,
  125. descr => "List all events with detailed info"
  126. }).
  127. -export([ create_rule/2
  128. , update_rule/2
  129. , list_rules/2
  130. , show_rule/2
  131. , delete_rule/2
  132. ]).
  133. -export([ list_actions/2
  134. , show_action/2
  135. ]).
  136. -export([ create_resource/2
  137. , list_resources/2
  138. , show_resource/2
  139. , get_resource_status/2
  140. , start_resource/2
  141. , delete_resource/2
  142. , update_resource/2
  143. ]).
  144. -export([ list_resource_types/2
  145. , list_resources_by_type/2
  146. , show_resource_type/2
  147. ]).
  148. -export([list_events/2]).
  149. -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~s Not Found", [(ID)]))).
  150. -define(ERR_NO_ACTION(NAME), list_to_binary(io_lib:format("Action ~s Not Found", [(NAME)]))).
  151. -define(ERR_NO_RESOURCE(RESID), list_to_binary(io_lib:format("Resource ~s Not Found", [(RESID)]))).
  152. -define(ERR_NO_RESOURCE_TYPE(TYPE), list_to_binary(io_lib:format("Resource Type ~s Not Found", [(TYPE)]))).
  153. -define(ERR_DEP_RULES_EXISTS(RULEIDS), list_to_binary(io_lib:format("Found rules ~0p depends on this resource, disable them first", [(RULEIDS)]))).
  154. -define(ERR_BADARGS(REASON),
  155. begin
  156. R0 = list_to_binary(io_lib:format("~0p", [REASON])),
  157. <<"Bad Arguments: ", R0/binary>>
  158. end).
  159. -dialyzer({nowarn_function, [create_rule/2,
  160. test_rule_sql/1,
  161. do_create_rule/1,
  162. update_rule/2
  163. ]}).
  164. %%------------------------------------------------------------------------------
  165. %% Rules API
  166. %%------------------------------------------------------------------------------
  167. create_rule(_Bindings, Params) ->
  168. if_test(fun() -> test_rule_sql(Params) end,
  169. fun() -> do_create_rule(Params) end,
  170. Params).
  171. test_rule_sql(Params) ->
  172. case emqx_rule_sqltester:test(emqx_json:decode(emqx_json:encode(Params), [return_maps])) of
  173. {ok, Result} -> return({ok, Result});
  174. {error, nomatch} -> return({error, 404, <<"SQL Not Match">>});
  175. {error, Reason} ->
  176. ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
  177. return({error, 400, ?ERR_BADARGS(Reason)})
  178. end.
  179. do_create_rule(Params) ->
  180. case emqx_rule_engine:create_rule(parse_rule_params(Params)) of
  181. {ok, Rule} -> return({ok, record_to_map(Rule)});
  182. {error, {action_not_found, ActionName}} ->
  183. return({error, 400, ?ERR_NO_ACTION(ActionName)});
  184. {error, Reason} ->
  185. ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
  186. return({error, 400, ?ERR_BADARGS(Reason)})
  187. end.
  188. update_rule(#{id := Id}, Params) ->
  189. case emqx_rule_engine:update_rule(parse_rule_params(Params, #{id => Id})) of
  190. {ok, Rule} -> return({ok, record_to_map(Rule)});
  191. {error, {not_found, RuleId}} ->
  192. return({error, 400, ?ERR_NO_RULE(RuleId)});
  193. {error, Reason} ->
  194. ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
  195. return({error, 400, ?ERR_BADARGS(Reason)})
  196. end.
  197. list_rules(_Bindings, _Params) ->
  198. return_all(emqx_rule_registry:get_rules_ordered_by_ts()).
  199. show_rule(#{id := Id}, _Params) ->
  200. reply_with(fun emqx_rule_registry:get_rule/1, Id).
  201. delete_rule(#{id := Id}, _Params) ->
  202. ok = emqx_rule_engine:delete_rule(Id),
  203. return(ok).
  204. %%------------------------------------------------------------------------------
  205. %% Actions API
  206. %%------------------------------------------------------------------------------
  207. list_actions(#{}, _Params) ->
  208. return_all(
  209. sort_by_title(action,
  210. emqx_rule_registry:get_actions())).
  211. show_action(#{name := Name}, _Params) ->
  212. reply_with(fun emqx_rule_registry:find_action/1, Name).
  213. %%------------------------------------------------------------------------------
  214. %% Resources API
  215. %%------------------------------------------------------------------------------
  216. create_resource(#{}, Params) ->
  217. case parse_resource_params(Params) of
  218. {ok, ParsedParams} ->
  219. if_test(fun() -> do_create_resource(test_resource, ParsedParams) end,
  220. fun() -> do_create_resource(create_resource, ParsedParams) end,
  221. Params);
  222. {error, Reason} ->
  223. ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
  224. return({error, 400, ?ERR_BADARGS(Reason)})
  225. end.
  226. do_create_resource(Create, ParsedParams) ->
  227. case emqx_rule_engine:Create(ParsedParams) of
  228. ok ->
  229. return(ok);
  230. {ok, Resource} ->
  231. return({ok, record_to_map(Resource)});
  232. {error, {resource_type_not_found, Type}} ->
  233. return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)});
  234. {error, {init_resource, _}} ->
  235. return({error, 500, <<"Init resource failure!">>});
  236. {error, Reason} ->
  237. ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
  238. return({error, 400, ?ERR_BADARGS(Reason)})
  239. end.
  240. list_resources(#{}, _Params) ->
  241. Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
  242. Data = lists:map(fun(Res = #{id := Id}) ->
  243. Status = lists:all(fun(Node) ->
  244. case rpc:call(Node, emqx_rule_registry, find_resource_params, [Id]) of
  245. {ok, #resource_params{status = #{is_alive := true}}} -> true;
  246. _ -> false
  247. end
  248. end, ekka_mnesia:running_nodes()),
  249. maps:put(status, Status, Res)
  250. end, Data0),
  251. return({ok, Data}).
  252. list_resources_by_type(#{type := Type}, _Params) ->
  253. return_all(emqx_rule_registry:get_resources_by_type(Type)).
  254. show_resource(#{id := Id}, _Params) ->
  255. case emqx_rule_registry:find_resource(Id) of
  256. {ok, R} ->
  257. Status =
  258. [begin
  259. {ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]),
  260. maps:put(node, Node, St)
  261. end || Node <- ekka_mnesia:running_nodes()],
  262. return({ok, maps:put(status, Status, record_to_map(R))});
  263. not_found ->
  264. return({error, 404, <<"Not Found">>})
  265. end.
  266. get_resource_status(#{id := Id}, _Params) ->
  267. case emqx_rule_engine:get_resource_status(Id) of
  268. {ok, Status} ->
  269. return({ok, Status});
  270. {error, {resource_not_found, ResId}} ->
  271. return({error, 400, ?ERR_NO_RESOURCE(ResId)})
  272. end.
  273. start_resource(#{id := Id}, _Params) ->
  274. case emqx_rule_engine:start_resource(Id) of
  275. ok ->
  276. return(ok);
  277. {error, {resource_not_found, ResId}} ->
  278. return({error, 400, ?ERR_NO_RESOURCE(ResId)});
  279. {error, Reason} ->
  280. ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
  281. return({error, 400, ?ERR_BADARGS(Reason)})
  282. end.
  283. update_resource(#{id := Id}, NewParams) ->
  284. P1 = case proplists:get_value(<<"description">>, NewParams) of
  285. undefined -> #{};
  286. Value -> #{<<"description">> => Value}
  287. end,
  288. P2 = case proplists:get_value(<<"config">>, NewParams) of
  289. undefined -> #{};
  290. [{}] -> #{};
  291. Config -> #{<<"config">> => ?RAISE(json_term_to_map(Config), {invalid_config, Config})}
  292. end,
  293. case emqx_rule_engine:update_resource(Id, maps:merge(P1, P2)) of
  294. ok ->
  295. return(ok);
  296. {error, not_found} ->
  297. return({error, 400, <<"Resource not found:", Id/binary>>});
  298. {error, {init_resource, _}} ->
  299. return({error, 500, <<"Init resource failure:", Id/binary>>});
  300. {error, {dependent_rules_exists, RuleIds}} ->
  301. return({error, 400, ?ERR_DEP_RULES_EXISTS(RuleIds)});
  302. {error, Reason} ->
  303. ?LOG(error, "Resource update failed: ~0p", [Reason]),
  304. return({error, 400, ?ERR_BADARGS(Reason)})
  305. end.
  306. delete_resource(#{id := Id}, _Params) ->
  307. case emqx_rule_engine:delete_resource(Id) of
  308. ok -> return(ok);
  309. {error, not_found} -> return(ok);
  310. {error, {dependent_rules_exists, RuleIds}} ->
  311. return({error, 400, ?ERR_DEP_RULES_EXISTS(RuleIds)});
  312. {error, Reason} ->
  313. return({error, 400, ?ERR_BADARGS(Reason)})
  314. end.
  315. %%------------------------------------------------------------------------------
  316. %% Resource Types API
  317. %%------------------------------------------------------------------------------
  318. list_resource_types(#{}, _Params) ->
  319. return_all(
  320. sort_by_title(resource_type,
  321. emqx_rule_registry:get_resource_types())).
  322. show_resource_type(#{name := Name}, _Params) ->
  323. reply_with(fun emqx_rule_registry:find_resource_type/1, Name).
  324. %%------------------------------------------------------------------------------
  325. %% Events API
  326. %%------------------------------------------------------------------------------
  327. list_events(#{}, _Params) ->
  328. return({ok, emqx_rule_events:event_info()}).
  329. %%------------------------------------------------------------------------------
  330. %% Internal functions
  331. %%------------------------------------------------------------------------------
  332. if_test(True, False, Params) ->
  333. case proplists:get_value(<<"test">>, Params) of
  334. Test when Test =:= true; Test =:= <<"true">> ->
  335. True();
  336. _ ->
  337. False()
  338. end.
  339. return_all(Records) ->
  340. Data = lists:foldr(fun maybe_record_to_map/2, [], Records),
  341. return({ok, Data}).
  342. maybe_record_to_map(Rec, Acc) ->
  343. case record_to_map(Rec) of
  344. ignore -> Acc;
  345. Map -> [Map | Acc]
  346. end.
  347. reply_with(Find, Key) ->
  348. case Find(Key) of
  349. {ok, R} ->
  350. return({ok, record_to_map(R)});
  351. not_found ->
  352. return({error, 404, <<"Not Found">>})
  353. end.
  354. record_to_map(#rule{id = Id,
  355. for = Hook,
  356. rawsql = RawSQL,
  357. actions = Actions,
  358. on_action_failed = OnFailed,
  359. enabled = Enabled,
  360. description = Descr}) ->
  361. #{id => Id,
  362. for => Hook,
  363. rawsql => RawSQL,
  364. actions => printable_actions(Actions),
  365. on_action_failed => OnFailed,
  366. metrics => get_rule_metrics(Id),
  367. enabled => Enabled,
  368. description => Descr
  369. };
  370. record_to_map(#action{hidden = true}) ->
  371. ignore;
  372. record_to_map(#action{name = Name,
  373. category = Category,
  374. app = App,
  375. for = Hook,
  376. types = Types,
  377. params_spec = Params,
  378. title = Title,
  379. description = Descr}) ->
  380. #{name => Name,
  381. category => Category,
  382. app => App,
  383. for => Hook,
  384. types => Types,
  385. params => Params,
  386. title => Title,
  387. description => Descr
  388. };
  389. record_to_map(#resource{id = Id,
  390. type = Type,
  391. config = Config,
  392. description = Descr}) ->
  393. #{id => Id,
  394. type => Type,
  395. config => Config,
  396. description => Descr
  397. };
  398. record_to_map(#resource_type{name = Name,
  399. provider = Provider,
  400. params_spec = Params,
  401. title = Title,
  402. description = Descr}) ->
  403. #{name => Name,
  404. provider => Provider,
  405. params => Params,
  406. title => Title,
  407. description => Descr
  408. }.
  409. printable_actions(Actions) ->
  410. [#{id => Id, name => Name, params => Args,
  411. metrics => get_action_metrics(Id),
  412. fallbacks => printable_actions(Fallbacks)}
  413. || #action_instance{id = Id, name = Name, args = Args, fallbacks = Fallbacks} <- Actions].
  414. parse_rule_params(Params) ->
  415. parse_rule_params(Params, #{description => <<"">>}).
  416. parse_rule_params([], Rule) ->
  417. Rule;
  418. parse_rule_params([{<<"id">>, Id} | Params], Rule) ->
  419. parse_rule_params(Params, Rule#{id => Id});
  420. parse_rule_params([{<<"rawsql">>, RawSQL} | Params], Rule) ->
  421. parse_rule_params(Params, Rule#{rawsql => RawSQL});
  422. parse_rule_params([{<<"enabled">>, Enabled} | Params], Rule) ->
  423. parse_rule_params(Params, Rule#{enabled => enabled(Enabled)});
  424. parse_rule_params([{<<"on_action_failed">>, OnFailed} | Params], Rule) ->
  425. parse_rule_params(Params, Rule#{on_action_failed => on_failed(OnFailed)});
  426. parse_rule_params([{<<"actions">>, Actions} | Params], Rule) ->
  427. parse_rule_params(Params, Rule#{actions => parse_actions(Actions)});
  428. parse_rule_params([{<<"description">>, Descr} | Params], Rule) ->
  429. parse_rule_params(Params, Rule#{description => Descr});
  430. parse_rule_params([_ | Params], Rule) ->
  431. parse_rule_params(Params, Rule).
  432. on_failed(<<"continue">>) -> continue;
  433. on_failed(<<"stop">>) -> stop;
  434. on_failed(OnFailed) -> error({invalid_on_failed, OnFailed}).
  435. enabled(Enabled) when is_boolean(Enabled) -> Enabled;
  436. enabled(Enabled) -> error({invalid_enabled, Enabled}).
  437. parse_actions(Actions) ->
  438. [parse_action(json_term_to_map(A)) || A <- Actions].
  439. parse_action(Action) ->
  440. #{name => binary_to_existing_atom(maps:get(<<"name">>, Action), utf8),
  441. args => maps:get(<<"params">>, Action, #{}),
  442. fallbacks => parse_actions(maps:get(<<"fallbacks">>, Action, []))}.
  443. parse_resource_params(Params) ->
  444. parse_resource_params(Params, #{config => #{}, description => <<"">>}).
  445. parse_resource_params([], Res) ->
  446. {ok, Res};
  447. parse_resource_params([{<<"id">>, Id} | Params], Res) ->
  448. parse_resource_params(Params, Res#{id => Id});
  449. parse_resource_params([{<<"type">>, ResourceType} | Params], Res) ->
  450. try parse_resource_params(Params, Res#{type => binary_to_existing_atom(ResourceType, utf8)})
  451. catch error:badarg ->
  452. {error, {resource_type_not_found, ResourceType}}
  453. end;
  454. parse_resource_params([{<<"config">>, Config} | Params], Res) ->
  455. parse_resource_params(Params, Res#{config => json_term_to_map(Config)});
  456. parse_resource_params([{<<"description">>, Descr} | Params], Res) ->
  457. parse_resource_params(Params, Res#{description => Descr});
  458. parse_resource_params([_ | Params], Res) ->
  459. parse_resource_params(Params, Res).
  460. json_term_to_map(List) ->
  461. emqx_json:decode(emqx_json:encode(List), [return_maps]).
  462. sort_by_title(action, Actions) ->
  463. sort_by(#action.title, Actions);
  464. sort_by_title(resource_type, ResourceTypes) ->
  465. sort_by(#resource_type.title, ResourceTypes).
  466. sort_by(Pos, TplList) ->
  467. lists:sort(
  468. fun(RecA, RecB) ->
  469. maps:get(en, element(Pos, RecA), 0)
  470. =< maps:get(en, element(Pos, RecB), 0)
  471. end, TplList).
  472. get_rule_metrics(Id) ->
  473. [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
  474. || Node <- ekka_mnesia:running_nodes()].
  475. get_action_metrics(Id) ->
  476. [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]))
  477. || Node <- ekka_mnesia:running_nodes()].
  478. %% TODO: V5 API
  479. return(_) -> ok.