emqx_rule_runtime.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_runtime).
  17. -include("rule_engine.hrl").
  18. -include_lib("emqx/include/emqx.hrl").
  19. -include_lib("emqx/include/logger.hrl").
  20. -export([ apply_rule/2
  21. , apply_rules/2
  22. , clear_rule_payload/0
  23. ]).
  24. -import(emqx_rule_maps,
  25. [ nested_get/2
  26. , range_gen/2
  27. , range_get/3
  28. ]).
  29. -type(input() :: map()).
  30. -type(alias() :: atom()).
  31. -type(collection() :: {alias(), [term()]}).
  32. -define(ephemeral_alias(TYPE, NAME),
  33. iolist_to_binary(io_lib:format("_v_~s_~p_~p", [TYPE, NAME, erlang:system_time()]))).
  34. -define(ActionMaxRetry, 3).
  35. %%------------------------------------------------------------------------------
  36. %% Apply rules
  37. %%------------------------------------------------------------------------------
  38. -spec(apply_rules(list(emqx_rule_engine:rule()), input()) -> ok).
  39. apply_rules([], _Input) ->
  40. ok;
  41. apply_rules([#rule{enabled = false}|More], Input) ->
  42. apply_rules(More, Input);
  43. apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
  44. try apply_rule_discard_result(Rule, Input)
  45. catch
  46. %% ignore the errors if select or match failed
  47. _:{select_and_transform_error, Error} ->
  48. ?LOG(warning, "SELECT clause exception for ~s failed: ~p",
  49. [RuleID, Error]);
  50. _:{match_conditions_error, Error} ->
  51. ?LOG(warning, "WHERE clause exception for ~s failed: ~p",
  52. [RuleID, Error]);
  53. _:{select_and_collect_error, Error} ->
  54. ?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
  55. [RuleID, Error]);
  56. _:{match_incase_error, Error} ->
  57. ?LOG(warning, "INCASE clause exception for ~s failed: ~p",
  58. [RuleID, Error]);
  59. _:Error:StkTrace ->
  60. ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
  61. [RuleID, Error, StkTrace])
  62. end,
  63. apply_rules(More, Input).
  64. apply_rule_discard_result(Rule, Input) ->
  65. _ = apply_rule(Rule, Input),
  66. ok.
  67. apply_rule(Rule = #rule{id = RuleID}, Input) ->
  68. clear_rule_payload(),
  69. do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
  70. do_apply_rule(#rule{id = RuleId,
  71. is_foreach = true,
  72. fields = Fields,
  73. doeach = DoEach,
  74. incase = InCase,
  75. conditions = Conditions,
  76. on_action_failed = OnFailed,
  77. actions = Actions}, Input) ->
  78. {Selected, Collection} = ?RAISE(select_and_collect(Fields, Input),
  79. {select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
  80. ColumnsAndSelected = maps:merge(Input, Selected),
  81. case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
  82. {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
  83. true ->
  84. ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
  85. Collection2 = filter_collection(Input, InCase, DoEach, Collection),
  86. {ok, [take_actions(Actions, Coll, Input, OnFailed) || Coll <- Collection2]};
  87. false ->
  88. {error, nomatch}
  89. end;
  90. do_apply_rule(#rule{id = RuleId,
  91. is_foreach = false,
  92. fields = Fields,
  93. conditions = Conditions,
  94. on_action_failed = OnFailed,
  95. actions = Actions}, Input) ->
  96. Selected = ?RAISE(select_and_transform(Fields, Input),
  97. {select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
  98. case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
  99. {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
  100. true ->
  101. ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
  102. {ok, take_actions(Actions, Selected, Input, OnFailed)};
  103. false ->
  104. {error, nomatch}
  105. end.
  106. clear_rule_payload() ->
  107. erlang:erase(rule_payload).
  108. %% SELECT Clause
  109. select_and_transform(Fields, Input) ->
  110. select_and_transform(Fields, Input, #{}).
  111. select_and_transform([], _Input, Output) ->
  112. Output;
  113. select_and_transform(['*'|More], Input, Output) ->
  114. select_and_transform(More, Input, maps:merge(Output, Input));
  115. select_and_transform([{as, Field, Alias}|More], Input, Output) ->
  116. Val = eval(Field, Input),
  117. select_and_transform(More,
  118. nested_put(Alias, Val, Input),
  119. nested_put(Alias, Val, Output));
  120. select_and_transform([Field|More], Input, Output) ->
  121. Val = eval(Field, Input),
  122. Key = alias(Field),
  123. select_and_transform(More,
  124. nested_put(Key, Val, Input),
  125. nested_put(Key, Val, Output)).
  126. %% FOREACH Clause
  127. -spec select_and_collect(list(), input()) -> {input(), collection()}.
  128. select_and_collect(Fields, Input) ->
  129. select_and_collect(Fields, Input, {#{}, {'item', []}}).
  130. select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) ->
  131. Val = eval(Field, Input),
  132. {nested_put(Alias, Val, Output), {A, ensure_list(Val)}};
  133. select_and_collect([{as, Field, Alias}|More], Input, {Output, LastKV}) ->
  134. Val = eval(Field, Input),
  135. select_and_collect(More,
  136. nested_put(Alias, Val, Input),
  137. {nested_put(Alias, Val, Output), LastKV});
  138. select_and_collect([Field], Input, {Output, _}) ->
  139. Val = eval(Field, Input),
  140. Key = alias(Field),
  141. {nested_put(Key, Val, Output), {'item', ensure_list(Val)}};
  142. select_and_collect([Field|More], Input, {Output, LastKV}) ->
  143. Val = eval(Field, Input),
  144. Key = alias(Field),
  145. select_and_collect(More,
  146. nested_put(Key, Val, Input),
  147. {nested_put(Key, Val, Output), LastKV}).
  148. %% Filter each item got from FOREACH
  149. -dialyzer({nowarn_function, filter_collection/4}).
  150. filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
  151. lists:filtermap(
  152. fun(Item) ->
  153. InputAndItem = maps:merge(Input, #{CollKey => Item}),
  154. case ?RAISE(match_conditions(InCase, InputAndItem),
  155. {match_incase_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
  156. true when DoEach == [] -> {true, InputAndItem};
  157. true ->
  158. {true, ?RAISE(select_and_transform(DoEach, InputAndItem),
  159. {doeach_error, {_EXCLASS_,_EXCPTION_,_ST_}})};
  160. false -> false
  161. end
  162. end, CollVal).
  163. %% Conditional Clauses such as WHERE, WHEN.
  164. match_conditions({'and', L, R}, Data) ->
  165. match_conditions(L, Data) andalso match_conditions(R, Data);
  166. match_conditions({'or', L, R}, Data) ->
  167. match_conditions(L, Data) orelse match_conditions(R, Data);
  168. match_conditions({'not', Var}, Data) ->
  169. case eval(Var, Data) of
  170. Bool when is_boolean(Bool) ->
  171. not Bool;
  172. _other -> false
  173. end;
  174. match_conditions({in, Var, {list, Vals}}, Data) ->
  175. lists:member(eval(Var, Data), [eval(V, Data) || V <- Vals]);
  176. match_conditions({'fun', {_, Name}, Args}, Data) ->
  177. apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data);
  178. match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
  179. compare(Op, eval(L, Data), eval(R, Data));
  180. %%match_conditions({'like', Var, Pattern}, Data) ->
  181. %% match_like(eval(Var, Data), Pattern);
  182. match_conditions({}, _Data) ->
  183. true.
  184. %% comparing numbers against strings
  185. compare(Op, L, R) when is_number(L), is_binary(R) ->
  186. do_compare(Op, L, number(R));
  187. compare(Op, L, R) when is_binary(L), is_number(R) ->
  188. do_compare(Op, number(L), R);
  189. compare(Op, L, R) when is_atom(L), is_binary(R) ->
  190. do_compare(Op, atom_to_binary(L, utf8), R);
  191. compare(Op, L, R) when is_binary(L), is_atom(R) ->
  192. do_compare(Op, L, atom_to_binary(R, utf8));
  193. compare(Op, L, R) ->
  194. do_compare(Op, L, R).
  195. do_compare('=', L, R) -> L == R;
  196. do_compare('>', L, R) -> L > R;
  197. do_compare('<', L, R) -> L < R;
  198. do_compare('<=', L, R) -> L =< R;
  199. do_compare('>=', L, R) -> L >= R;
  200. do_compare('<>', L, R) -> L /= R;
  201. do_compare('!=', L, R) -> L /= R;
  202. do_compare('=~', T, F) -> emqx_topic:match(T, F).
  203. number(Bin) ->
  204. try binary_to_integer(Bin)
  205. catch error:badarg -> binary_to_float(Bin)
  206. end.
  207. %% Step3 -> Take actions
  208. take_actions(Actions, Selected, Envs, OnFailed) ->
  209. [take_action(ActInst, Selected, Envs, OnFailed, ?ActionMaxRetry)
  210. || ActInst <- Actions].
  211. take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = ActInst,
  212. Selected, Envs, OnFailed, RetryN) when RetryN >= 0 ->
  213. try
  214. {ok, #action_instance_params{apply = Apply}}
  215. = emqx_rule_registry:get_action_instance_params(Id),
  216. emqx_rule_metrics:inc_actions_taken(Id),
  217. apply_action_func(Selected, Envs, Apply, ActName)
  218. of
  219. {badact, Reason} ->
  220. handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, Reason);
  221. Result -> Result
  222. catch
  223. error:{badfun, _Func}:_ST ->
  224. %?LOG(warning, "Action ~p maybe outdated, refresh it and try again."
  225. % "Func: ~p~nST:~0p", [Id, Func, ST]),
  226. _ = trans_action_on(Id, fun() ->
  227. emqx_rule_engine:refresh_actions([ActInst])
  228. end, 5000),
  229. emqx_rule_metrics:inc_actions_retry(Id),
  230. take_action(ActInst, Selected, Envs, OnFailed, RetryN-1);
  231. Error:Reason:Stack ->
  232. emqx_rule_metrics:inc_actions_exception(Id),
  233. handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {Error, Reason, Stack})
  234. end;
  235. take_action(#action_instance{id = Id, fallbacks = Fallbacks}, Selected, Envs, OnFailed, _RetryN) ->
  236. emqx_rule_metrics:inc_actions_error(Id),
  237. handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {max_try_reached, ?ActionMaxRetry}).
  238. apply_action_func(Data, Envs, #{mod := Mod, bindings := Bindings}, Name) ->
  239. %% TODO: Build the Func Name when creating the action
  240. Func = cbk_on_action_triggered(Name),
  241. Mod:Func(Data, Envs#{'__bindings__' => Bindings});
  242. apply_action_func(Data, Envs, Func, _Name) when is_function(Func) ->
  243. erlang:apply(Func, [Data, Envs]).
  244. cbk_on_action_triggered(Name) ->
  245. list_to_atom("on_action_" ++ atom_to_list(Name)).
  246. trans_action_on(Id, Callback, Timeout) ->
  247. case emqx_rule_locker:lock(Id) of
  248. true -> try Callback() after emqx_rule_locker:unlock(Id) end;
  249. _ ->
  250. wait_action_on(Id, Timeout div 10)
  251. end.
  252. wait_action_on(_, 0) ->
  253. {error, timeout};
  254. wait_action_on(Id, RetryN) ->
  255. timer:sleep(10),
  256. case emqx_rule_registry:get_action_instance_params(Id) of
  257. not_found ->
  258. {error, not_found};
  259. {ok, #action_instance_params{apply = Apply}} ->
  260. case catch apply_action_func(baddata, #{}, Apply, tryit) of
  261. {'EXIT', {{badfun, _}, _}} ->
  262. wait_action_on(Id, RetryN-1);
  263. _ ->
  264. ok
  265. end
  266. end.
  267. handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) ->
  268. ?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]),
  269. _ = take_actions(Fallbacks, Selected, Envs, continue),
  270. failed;
  271. handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) ->
  272. ?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]),
  273. _ = take_actions(Fallbacks, Selected, Envs, continue),
  274. error({take_action_failed, {Id, Reason}}).
  275. eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
  276. nested_get({path, Path}, may_decode_payload(Payload));
  277. eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
  278. nested_get({path, Path}, may_decode_payload(Payload));
  279. eval({path, _} = Path, Input) ->
  280. nested_get(Path, Input);
  281. eval({range, {Begin, End}}, _Input) ->
  282. range_gen(Begin, End);
  283. eval({get_range, {Begin, End}, Data}, Input) ->
  284. range_get(Begin, End, eval(Data, Input));
  285. eval({var, _} = Var, Input) ->
  286. nested_get(Var, Input);
  287. eval({const, Val}, _Input) ->
  288. Val;
  289. %% unary add
  290. eval({'+', L}, Input) ->
  291. eval(L, Input);
  292. %% unary subtract
  293. eval({'-', L}, Input) ->
  294. -(eval(L, Input));
  295. eval({Op, L, R}, Input) when ?is_arith(Op) ->
  296. apply_func(Op, [eval(L, Input), eval(R, Input)], Input);
  297. eval({Op, L, R}, Input) when ?is_comp(Op) ->
  298. compare(Op, eval(L, Input), eval(R, Input));
  299. eval({list, List}, Input) ->
  300. [eval(L, Input) || L <- List];
  301. eval({'case', <<>>, CaseClauses, ElseClauses}, Input) ->
  302. eval_case_clauses(CaseClauses, ElseClauses, Input);
  303. eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) ->
  304. eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input);
  305. eval({'fun', {_, Name}, Args}, Input) ->
  306. apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input).
  307. handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Input) ->
  308. Input#{payload => may_decode_payload(Payload)};
  309. handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Input) ->
  310. Input#{<<"payload">> => may_decode_payload(Payload)};
  311. handle_alias(_, Input) ->
  312. Input.
  313. alias({var, Var}) ->
  314. {var, Var};
  315. alias({const, Val}) when is_binary(Val) ->
  316. {var, Val};
  317. alias({list, L}) ->
  318. {var, ?ephemeral_alias(list, length(L))};
  319. alias({range, R}) ->
  320. {var, ?ephemeral_alias(range, R)};
  321. alias({get_range, _, {var, Key}}) ->
  322. {var, Key};
  323. alias({get_range, _, {path, Path}}) ->
  324. {path, Path};
  325. alias({path, Path}) ->
  326. {path, Path};
  327. alias({const, Val}) ->
  328. {var, ?ephemeral_alias(const, Val)};
  329. alias({Op, _L, _R}) when ?is_arith(Op); ?is_comp(Op) ->
  330. {var, ?ephemeral_alias(op, Op)};
  331. alias({'case', On, _, _}) ->
  332. {var, ?ephemeral_alias('case', On)};
  333. alias({'fun', Name, _}) ->
  334. {var, ?ephemeral_alias('fun', Name)};
  335. alias(_) ->
  336. ?ephemeral_alias(unknown, unknown).
  337. eval_case_clauses([], ElseClauses, Input) ->
  338. case ElseClauses of
  339. {} -> undefined;
  340. _ -> eval(ElseClauses, Input)
  341. end;
  342. eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
  343. case match_conditions(Cond, Input) of
  344. true ->
  345. eval(Clause, Input);
  346. _ ->
  347. eval_case_clauses(CaseClauses, ElseClauses, Input)
  348. end.
  349. eval_switch_clauses(_CaseOn, [], ElseClauses, Input) ->
  350. case ElseClauses of
  351. {} -> undefined;
  352. _ -> eval(ElseClauses, Input)
  353. end;
  354. eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
  355. ConResult = eval(Cond, Input),
  356. case eval(CaseOn, Input) of
  357. ConResult ->
  358. eval(Clause, Input);
  359. _ ->
  360. eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input)
  361. end.
  362. apply_func(Name, Args, Input) when is_atom(Name) ->
  363. do_apply_func(Name, Args, Input);
  364. apply_func(Name, Args, Input) when is_binary(Name) ->
  365. FunName =
  366. try binary_to_existing_atom(Name, utf8)
  367. catch error:badarg -> error({sql_function_not_supported, Name})
  368. end,
  369. do_apply_func(FunName, Args, Input).
  370. do_apply_func(Name, Args, Input) ->
  371. case erlang:apply(emqx_rule_funcs, Name, Args) of
  372. Func when is_function(Func) ->
  373. erlang:apply(Func, [Input]);
  374. Result -> Result
  375. end.
  376. add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) ->
  377. NewMetadata = maps:merge(maps:get(metadata, Input, #{}), Metadata),
  378. Input#{metadata => NewMetadata}.
  379. %%------------------------------------------------------------------------------
  380. %% Internal Functions
  381. %%------------------------------------------------------------------------------
  382. may_decode_payload(Payload) when is_binary(Payload) ->
  383. case get_cached_payload() of
  384. undefined -> safe_decode_and_cache(Payload);
  385. DecodedP -> DecodedP
  386. end;
  387. may_decode_payload(Payload) ->
  388. Payload.
  389. get_cached_payload() ->
  390. erlang:get(rule_payload).
  391. cache_payload(DecodedP) ->
  392. erlang:put(rule_payload, DecodedP),
  393. DecodedP.
  394. safe_decode_and_cache(MaybeJson) ->
  395. try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
  396. catch _:_ -> #{}
  397. end.
  398. ensure_list(List) when is_list(List) -> List;
  399. ensure_list(_NotList) -> [].
  400. nested_put(Alias, Val, Input0) ->
  401. Input = handle_alias(Alias, Input0),
  402. emqx_rule_maps:nested_put(Alias, Val, Input).