emqx_rule_engine_cli.erl 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_engine_cli).
  17. -include("rule_engine.hrl").
  18. -export([ load/0
  19. , commands/0
  20. , unload/0
  21. ]).
  22. -export([ rules/1
  23. , actions/1
  24. , resources/1
  25. , resource_types/1
  26. ]).
  27. -import(proplists, [get_value/2]).
  28. -define(OPTSPEC_RESOURCE_TYPE,
  29. [{type, $t, "type", {atom, undefined}, "Resource Type"}]).
  30. -define(OPTSPEC_ACTION_TYPE,
  31. [ {eventype, $k, "eventype", {atom, undefined}, "Event Type"}
  32. ]).
  33. -define(OPTSPEC_RESOURCES_CREATE,
  34. [ {type, undefined, undefined, atom, "Resource Type"}
  35. , {id, $i, "id", {binary, <<"">>}, "The resource id. A random resource id will be used if not provided"}
  36. , {config, $c, "config", {binary, <<"{}">>}, "Config"}
  37. , {descr, $d, "descr", {binary, <<"">>}, "Description"}
  38. ]).
  39. -define(OPTSPEC_RESOURCES_UPDATE,
  40. [ {id, undefined, undefined, binary, "The resource id"}
  41. , {config, $c, "config", {binary, undefined}, "Config"}
  42. , {description, $d, "descr", {binary, undefined}, "Description"}
  43. ]).
  44. -define(OPTSPEC_RULES_CREATE,
  45. [ {sql, undefined, undefined, binary, "Filter Condition SQL"}
  46. , {actions, undefined, undefined, binary, "Action List in JSON format: [{\"name\": <action_name>, \"params\": {<key>: <value>}}]"}
  47. , {id, $i, "id", {binary, <<"">>}, "The rule id. A random rule id will be used if not provided"}
  48. , {enabled, $e, "enabled", {atom, true}, "'true' or 'false' to enable or disable the rule"}
  49. , {on_action_failed, $g, "on_action_failed", {atom, continue}, "'continue' or 'stop' when an action in the rule fails"}
  50. , {descr, $d, "descr", {binary, <<"">>}, "Description"}
  51. ]).
  52. -define(OPTSPEC_RULES_UPDATE,
  53. [ {id, undefined, undefined, binary, "Rule ID"}
  54. , {sql, $s, "sql", {binary, undefined}, "Filter Condition SQL"}
  55. , {actions, $a, "actions", {binary, undefined}, "Action List in JSON format: [{\"name\": <action_name>, \"params\": {<key>: <value>}}]"}
  56. , {enabled, $e, "enabled", {atom, undefined}, "'true' or 'false' to enable or disable the rule"}
  57. , {on_action_failed, $g, "on_action_failed", {atom, undefined}, "'continue' or 'stop' when an action in the rule fails"}
  58. , {descr, $d, "descr", {binary, undefined}, "Description"}
  59. ]).
  60. %%-----------------------------------------------------------------------------
  61. %% Load/Unload Commands
  62. %%-----------------------------------------------------------------------------
  63. -spec(load() -> ok).
  64. load() ->
  65. lists:foreach(
  66. fun({Cmd, Func}) ->
  67. emqx_ctl:register_command(Cmd, {?MODULE, Func}, []);
  68. (Cmd) ->
  69. emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, [])
  70. end, commands()).
  71. -spec(commands() -> list(atom())).
  72. commands() ->
  73. [rules, {'rule-actions', actions}, resources, {'resource-types', resource_types}].
  74. -spec(unload() -> ok).
  75. unload() ->
  76. lists:foreach(
  77. fun({Cmd, _Func}) ->
  78. emqx_ctl:unregister_command(Cmd);
  79. (Cmd) ->
  80. emqx_ctl:unregister_command(Cmd)
  81. end, commands()).
  82. %%-----------------------------------------------------------------------------
  83. %% 'rules' command
  84. %%-----------------------------------------------------------------------------
  85. -dialyzer([{nowarn_function, [rules/1]}]).
  86. rules(["list"]) ->
  87. print_all(emqx_rule_registry:get_rules_ordered_by_ts());
  88. rules(["show", RuleId]) ->
  89. print_with(fun emqx_rule_registry:get_rule/1, list_to_binary(RuleId));
  90. rules(["create" | Params]) ->
  91. with_opts(fun({Opts, _}) ->
  92. case emqx_rule_engine:create_rule(make_rule(Opts)) of
  93. {ok, #rule{id = RuleId}} ->
  94. emqx_ctl:print("Rule ~s created~n", [RuleId]);
  95. {error, Reason} ->
  96. emqx_ctl:print("Invalid options: ~0p~n", [Reason])
  97. end
  98. end, Params, ?OPTSPEC_RULES_CREATE, {?FUNCTION_NAME, create});
  99. rules(["update" | Params]) ->
  100. with_opts(fun({Opts, _}) ->
  101. case emqx_rule_engine:update_rule(make_updated_rule(Opts)) of
  102. {ok, #rule{id = RuleId}} ->
  103. emqx_ctl:print("Rule ~s updated~n", [RuleId]);
  104. {error, Reason} ->
  105. emqx_ctl:print("Invalid options: ~0p~n", [Reason])
  106. end
  107. end, Params, ?OPTSPEC_RULES_UPDATE, {?FUNCTION_NAME, update});
  108. rules(["delete", RuleId]) ->
  109. ok = emqx_rule_engine:delete_rule(list_to_binary(RuleId)),
  110. emqx_ctl:print("ok~n");
  111. rules(_Usage) ->
  112. emqx_ctl:usage([{"rules list", "List all rules"},
  113. {"rules show <RuleId>", "Show a rule"},
  114. {"rules create", "Create a rule"},
  115. {"rules delete <RuleId>", "Delete a rule"}
  116. ]).
  117. %%-----------------------------------------------------------------------------
  118. %% 'rule-actions' command
  119. %%-----------------------------------------------------------------------------
  120. actions(["list"]) ->
  121. print_all(get_actions());
  122. actions(["show", ActionId]) ->
  123. print_with(fun emqx_rule_registry:find_action/1,
  124. ?RAISE(list_to_existing_atom(ActionId), {not_found, ActionId}));
  125. actions(_Usage) ->
  126. emqx_ctl:usage([{"rule-actions list", "List actions"},
  127. {"rule-actions show <ActionId>", "Show a rule action"}
  128. ]).
  129. %%------------------------------------------------------------------------------
  130. %% 'resources' command
  131. %%------------------------------------------------------------------------------
  132. resources(["create" | Params]) ->
  133. with_opts(fun({Opts, _}) ->
  134. case emqx_rule_engine:create_resource(make_resource(Opts)) of
  135. {ok, #resource{id = ResId}} ->
  136. emqx_ctl:print("Resource ~s created~n", [ResId]);
  137. {error, Reason} ->
  138. emqx_ctl:print("Invalid options: ~0p~n", [Reason])
  139. end
  140. end, Params, ?OPTSPEC_RESOURCES_CREATE, {?FUNCTION_NAME, create});
  141. resources(["update" | Params]) ->
  142. with_opts(fun({Opts, _}) ->
  143. Id = proplists:get_value(id, Opts),
  144. Maps = make_updated_resource(Opts),
  145. case emqx_rule_engine:update_resource(Id, Maps) of
  146. ok ->
  147. emqx_ctl:print("Resource update successfully~n");
  148. {error, Reason} ->
  149. emqx_ctl:print("Resource update failed: ~0p~n", [Reason])
  150. end
  151. end, Params, ?OPTSPEC_RESOURCES_UPDATE, {?FUNCTION_NAME, update});
  152. resources(["test" | Params]) ->
  153. with_opts(fun({Opts, _}) ->
  154. case emqx_rule_engine:test_resource(make_resource(Opts)) of
  155. ok ->
  156. emqx_ctl:print("Test creating resource successfully (dry-run)~n");
  157. {error, Reason} ->
  158. emqx_ctl:print("Test creating resource failed: ~0p~n", [Reason])
  159. end
  160. end, Params, ?OPTSPEC_RESOURCES_CREATE, {?FUNCTION_NAME, test});
  161. resources(["list"]) ->
  162. print_all(emqx_rule_registry:get_resources());
  163. resources(["list" | Params]) ->
  164. with_opts(fun({Opts, _}) ->
  165. print_all(emqx_rule_registry:get_resources_by_type(
  166. get_value(type, Opts)))
  167. end, Params, ?OPTSPEC_RESOURCE_TYPE, {?FUNCTION_NAME, list});
  168. resources(["show", ResourceId]) ->
  169. print_with(fun emqx_rule_registry:find_resource/1, list_to_binary(ResourceId));
  170. resources(["delete", ResourceId]) ->
  171. case emqx_rule_engine:delete_resource(list_to_binary(ResourceId)) of
  172. ok -> emqx_ctl:print("ok~n");
  173. {error, not_found} -> emqx_ctl:print("ok~n");
  174. {error, Reason} ->
  175. emqx_ctl:print("Cannot delete resource as ~0p~n", [Reason])
  176. end;
  177. resources(_Usage) ->
  178. emqx_ctl:usage([{"resources create", "Create a resource"},
  179. {"resources list [-t <ResourceType>]", "List resources"},
  180. {"resources show <ResourceId>", "Show a resource"},
  181. {"resources delete <ResourceId>", "Delete a resource"},
  182. {"resources update <ResourceId> [-c <config>] [-d <description>]", "Update a resource"}
  183. ]).
  184. %%------------------------------------------------------------------------------
  185. %% 'resource-types' command
  186. %%------------------------------------------------------------------------------
  187. resource_types(["list"]) ->
  188. print_all(emqx_rule_registry:get_resource_types());
  189. resource_types(["show", Name]) ->
  190. print_with(fun emqx_rule_registry:find_resource_type/1, list_to_atom(Name));
  191. resource_types(_Usage) ->
  192. emqx_ctl:usage([{"resource-types list", "List all resource-types"},
  193. {"resource-types show <Type>", "Show a resource-type"}
  194. ]).
  195. %%------------------------------------------------------------------------------
  196. %% Internal functions
  197. %%------------------------------------------------------------------------------
  198. print(Data) ->
  199. emqx_ctl:print(untilde(format(Data))).
  200. print_all(DataList) ->
  201. lists:map(fun(Data) ->
  202. print(Data)
  203. end, DataList).
  204. print_with(FindFun, Key) ->
  205. case FindFun(Key) of
  206. {ok, R} ->
  207. print(R);
  208. not_found ->
  209. emqx_ctl:print("Cannot found ~s~n", [Key])
  210. end.
  211. format(#rule{id = Id,
  212. for = Hook,
  213. rawsql = Sql,
  214. actions = Actions,
  215. on_action_failed = OnFailed,
  216. enabled = Enabled,
  217. description = Descr}) ->
  218. lists:flatten(io_lib:format("rule(id='~s', for='~0p', rawsql='~s', actions=~0p, on_action_failed='~s', metrics=~0p, enabled='~s', description='~s')~n", [Id, Hook, rmlf(Sql), printable_actions(Actions), OnFailed, get_rule_metrics(Id), Enabled, Descr]));
  219. format(#action{hidden = true}) ->
  220. ok;
  221. format(#action{name = Name,
  222. for = Hook,
  223. app = App,
  224. types = Types,
  225. title = #{en := Title},
  226. description = #{en := Descr}}) ->
  227. lists:flatten(io_lib:format("action(name='~s', app='~s', for='~s', types=~0p, title ='~s', description='~s')~n", [Name, App, Hook, Types, Title, Descr]));
  228. format(#resource{id = Id,
  229. type = Type,
  230. config = Config,
  231. description = Descr}) ->
  232. Status =
  233. [begin
  234. {ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]),
  235. maps:put(node, Node, St)
  236. end || Node <- [node()| nodes()]],
  237. lists:flatten(io_lib:format("resource(id='~s', type='~s', config=~0p, status=~0p, description='~s')~n", [Id, Type, Config, Status, Descr]));
  238. format(#resource_type{name = Name,
  239. provider = Provider,
  240. title = #{en := Title},
  241. description = #{en := Descr}}) ->
  242. lists:flatten(io_lib:format("resource_type(name='~s', provider='~s', title ='~s', description='~s')~n", [Name, Provider, Title, Descr])).
  243. make_rule(Opts) ->
  244. Actions = get_value(actions, Opts),
  245. may_with_opt(
  246. #{rawsql => get_value(sql, Opts),
  247. enabled => get_value(enabled, Opts),
  248. actions => parse_actions(emqx_json:decode(Actions, [return_maps])),
  249. on_action_failed => on_failed(get_value(on_action_failed, Opts)),
  250. description => get_value(descr, Opts)}, id, <<"">>, Opts).
  251. make_updated_rule(Opts) ->
  252. KeyNameParsers = [{sql, rawsql, fun(SQL) -> SQL end},
  253. enabled,
  254. {actions, actions, fun(Actions) ->
  255. parse_actions(emqx_json:decode(Actions, [return_maps]))
  256. end},
  257. on_action_failed,
  258. {descr, description, fun(Descr) -> Descr end}],
  259. lists:foldl(fun
  260. ({Key, Name, Parser}, ParamsAcc) ->
  261. case get_value(Key, Opts) of
  262. undefined -> ParamsAcc;
  263. Val -> ParamsAcc#{Name => Parser(Val)}
  264. end;
  265. (Key, ParamsAcc) ->
  266. case get_value(Key, Opts) of
  267. undefined -> ParamsAcc;
  268. Val -> ParamsAcc#{Key => Val}
  269. end
  270. end, #{id => get_value(id, Opts)}, KeyNameParsers).
  271. make_resource(Opts) ->
  272. Config = get_value(config, Opts),
  273. may_with_opt(
  274. #{type => get_value(type, Opts),
  275. config => ?RAISE(emqx_json:decode(Config, [return_maps]), {invalid_config, Config}),
  276. description => get_value(descr, Opts)}, id, <<"">>, Opts).
  277. make_updated_resource(Opts) ->
  278. P1 = case proplists:get_value(description, Opts) of
  279. undefined -> #{};
  280. Value -> #{<<"description">> => Value}
  281. end,
  282. P2 = case proplists:get_value(config, Opts) of
  283. undefined -> #{};
  284. Map -> #{<<"config">> => ?RAISE((emqx_json:decode(Map, [return_maps])), {invalid_config, Map})}
  285. end,
  286. maps:merge(P1, P2).
  287. printable_actions(Actions) when is_list(Actions) ->
  288. emqx_json:encode([#{id => Id, name => Name, params => Args,
  289. metrics => get_action_metrics(Id),
  290. fallbacks => printable_actions(Fallbacks)}
  291. || #action_instance{id = Id, name = Name, args = Args, fallbacks = Fallbacks} <- Actions]).
  292. may_with_opt(Params, OptName, DefaultVal, Options) when is_map(Params) ->
  293. case get_value(OptName, Options) of
  294. DefaultVal -> Params;
  295. Val -> Params#{OptName => Val}
  296. end.
  297. with_opts(Action, RawParams, OptSpecList, {CmdObject, CmdName}) ->
  298. case getopt:parse_and_check(OptSpecList, RawParams) of
  299. {ok, Params} ->
  300. Action(Params);
  301. {error, Reason} ->
  302. getopt:usage(OptSpecList,
  303. io_lib:format("emqx_ctl ~s ~s", [CmdObject, CmdName]), standard_io),
  304. emqx_ctl:print("~0p~n", [Reason])
  305. end.
  306. parse_actions(Actions) ->
  307. ?RAISE([parse_action(Action) || Action <- Actions],
  308. {invalid_action_params, {_EXCLASS_,_EXCPTION_,_ST_}}).
  309. parse_action(Action) ->
  310. ActName = maps:get(<<"name">>, Action),
  311. #{name => ?RAISE(binary_to_existing_atom(ActName, utf8), {action_not_found, ActName}),
  312. args => maps:get(<<"params">>, Action, #{}),
  313. fallbacks => parse_actions(maps:get(<<"fallbacks">>, Action, []))}.
  314. get_actions() ->
  315. emqx_rule_registry:get_actions().
  316. get_rule_metrics(Id) ->
  317. [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
  318. || Node <- [node()| nodes()]].
  319. get_action_metrics(Id) ->
  320. [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]))
  321. || Node <- [node()| nodes()]].
  322. on_failed(continue) -> continue;
  323. on_failed(stop) -> stop;
  324. on_failed(OnFailed) -> error({invalid_on_failed, OnFailed}).
  325. rmlf(Str) ->
  326. re:replace(Str, "\n", "", [global]).
  327. untilde(Str) ->
  328. re:replace(Str, "~", "&&", [{return, list}, global]).