emqx_rule_runtime.erl 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_runtime).
  17. -include("rule_engine.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include_lib("emqx_resource/include/emqx_resource_errors.hrl").
  20. -export([
  21. apply_rule/3,
  22. apply_rules/3,
  23. inc_action_metrics/2
  24. ]).
  25. -import(
  26. emqx_rule_maps,
  27. [
  28. nested_get/2,
  29. range_gen/2,
  30. range_get/3
  31. ]
  32. ).
  33. -compile({no_auto_import, [alias/2]}).
  34. -type columns() :: map().
  35. -type alias() :: atom().
  36. -type collection() :: {alias(), [term()]}.
  37. -elvis([
  38. {elvis_style, invalid_dynamic_call, #{ignore => [emqx_rule_runtime]}}
  39. ]).
  40. -define(ephemeral_alias(TYPE, NAME),
  41. iolist_to_binary(io_lib:format("_v_~ts_~p_~p", [TYPE, NAME, erlang:system_time()]))
  42. ).
  43. %%------------------------------------------------------------------------------
  44. %% Apply rules
  45. %%------------------------------------------------------------------------------
  46. -spec apply_rules(list(rule()), columns(), envs()) -> ok.
  47. apply_rules([], _Columns, _Envs) ->
  48. ok;
  49. apply_rules([#{enable := false} | More], Columns, Envs) ->
  50. apply_rules(More, Columns, Envs);
  51. apply_rules([Rule | More], Columns, Envs) ->
  52. apply_rule_discard_result(Rule, Columns, Envs),
  53. apply_rules(More, Columns, Envs).
  54. apply_rule_discard_result(Rule, Columns, Envs) ->
  55. _ = apply_rule(Rule, Columns, Envs),
  56. ok.
  57. apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
  58. ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'),
  59. clear_rule_payload(),
  60. try
  61. do_apply_rule(Rule, add_metadata(Columns, #{rule_id => RuleID}), Envs)
  62. catch
  63. %% ignore the errors if select or match failed
  64. _:Reason = {select_and_transform_error, Error} ->
  65. ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
  66. ?SLOG(warning, #{
  67. msg => "SELECT_clause_exception",
  68. rule_id => RuleID,
  69. reason => Error
  70. }),
  71. {error, Reason};
  72. _:Reason = {match_conditions_error, Error} ->
  73. ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
  74. ?SLOG(warning, #{
  75. msg => "WHERE_clause_exception",
  76. rule_id => RuleID,
  77. reason => Error
  78. }),
  79. {error, Reason};
  80. _:Reason = {select_and_collect_error, Error} ->
  81. ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
  82. ?SLOG(warning, #{
  83. msg => "FOREACH_clause_exception",
  84. rule_id => RuleID,
  85. reason => Error
  86. }),
  87. {error, Reason};
  88. _:Reason = {match_incase_error, Error} ->
  89. ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
  90. ?SLOG(warning, #{
  91. msg => "INCASE_clause_exception",
  92. rule_id => RuleID,
  93. reason => Error
  94. }),
  95. {error, Reason};
  96. Class:Error:StkTrace ->
  97. ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
  98. ?SLOG(error, #{
  99. msg => "apply_rule_failed",
  100. rule_id => RuleID,
  101. exception => Class,
  102. reason => Error,
  103. stacktrace => StkTrace
  104. }),
  105. {error, {Error, StkTrace}}
  106. end.
  107. do_apply_rule(
  108. #{
  109. id := RuleId,
  110. is_foreach := true,
  111. fields := Fields,
  112. doeach := DoEach,
  113. incase := InCase,
  114. conditions := Conditions,
  115. actions := Actions
  116. },
  117. Columns,
  118. Envs
  119. ) ->
  120. {Selected, Collection} = ?RAISE(
  121. select_and_collect(Fields, Columns),
  122. {select_and_collect_error, {EXCLASS, EXCPTION, ST}}
  123. ),
  124. ColumnsAndSelected = maps:merge(Columns, Selected),
  125. case
  126. ?RAISE(
  127. match_conditions(Conditions, ColumnsAndSelected),
  128. {match_conditions_error, {EXCLASS, EXCPTION, ST}}
  129. )
  130. of
  131. true ->
  132. Collection2 = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection),
  133. case Collection2 of
  134. [] ->
  135. ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
  136. _ ->
  137. ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
  138. end,
  139. NewEnvs = maps:merge(ColumnsAndSelected, Envs),
  140. {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- Collection2]};
  141. false ->
  142. ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
  143. {error, nomatch}
  144. end;
  145. do_apply_rule(
  146. #{
  147. id := RuleId,
  148. is_foreach := false,
  149. fields := Fields,
  150. conditions := Conditions,
  151. actions := Actions
  152. },
  153. Columns,
  154. Envs
  155. ) ->
  156. Selected = ?RAISE(
  157. select_and_transform(Fields, Columns),
  158. {select_and_transform_error, {EXCLASS, EXCPTION, ST}}
  159. ),
  160. case
  161. ?RAISE(
  162. match_conditions(Conditions, maps:merge(Columns, Selected)),
  163. {match_conditions_error, {EXCLASS, EXCPTION, ST}}
  164. )
  165. of
  166. true ->
  167. ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
  168. {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
  169. false ->
  170. ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
  171. {error, nomatch}
  172. end.
  173. clear_rule_payload() ->
  174. erlang:erase(rule_payload).
  175. %% SELECT Clause
  176. select_and_transform(Fields, Columns) ->
  177. select_and_transform(Fields, Columns, #{}).
  178. select_and_transform([], _Columns, Action) ->
  179. Action;
  180. select_and_transform(['*' | More], Columns, Action) ->
  181. select_and_transform(More, Columns, maps:merge(Action, Columns));
  182. select_and_transform([{as, Field, Alias} | More], Columns, Action) ->
  183. Val = eval(Field, [Action, Columns]),
  184. select_and_transform(
  185. More,
  186. Columns,
  187. nested_put(Alias, Val, Action)
  188. );
  189. select_and_transform([Field | More], Columns, Action) ->
  190. Val = eval(Field, [Action, Columns]),
  191. Key = alias(Field, Columns),
  192. select_and_transform(
  193. More,
  194. Columns,
  195. nested_put(Key, Val, Action)
  196. ).
  197. %% FOREACH Clause
  198. -spec select_and_collect(list(), columns()) -> {columns(), collection()}.
  199. select_and_collect(Fields, Columns) ->
  200. select_and_collect(Fields, Columns, {#{}, {'item', []}}).
  201. select_and_collect([{as, Field, {_, A} = Alias}], Columns, {Action, _}) ->
  202. Val = eval(Field, [Action, Columns]),
  203. {nested_put(Alias, Val, Action), {A, ensure_list(Val)}};
  204. select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) ->
  205. Val = eval(Field, [Action, Columns]),
  206. select_and_collect(
  207. More,
  208. nested_put(Alias, Val, Columns),
  209. {nested_put(Alias, Val, Action), LastKV}
  210. );
  211. select_and_collect([Field], Columns, {Action, _}) ->
  212. Val = eval(Field, [Action, Columns]),
  213. Key = alias(Field, Columns),
  214. {nested_put(Key, Val, Action), {'item', ensure_list(Val)}};
  215. select_and_collect([Field | More], Columns, {Action, LastKV}) ->
  216. Val = eval(Field, [Action, Columns]),
  217. Key = alias(Field, Columns),
  218. select_and_collect(
  219. More,
  220. Columns,
  221. {nested_put(Key, Val, Action), LastKV}
  222. ).
  223. %% Filter each item got from FOREACH
  224. filter_collection(Columns, InCase, DoEach, {CollKey, CollVal}) ->
  225. lists:filtermap(
  226. fun(Item) ->
  227. ColumnsAndItem = maps:merge(Columns, #{CollKey => Item}),
  228. case
  229. ?RAISE(
  230. match_conditions(InCase, ColumnsAndItem),
  231. {match_incase_error, {EXCLASS, EXCPTION, ST}}
  232. )
  233. of
  234. true when DoEach == [] -> {true, ColumnsAndItem};
  235. true ->
  236. {true,
  237. ?RAISE(
  238. select_and_transform(DoEach, ColumnsAndItem),
  239. {doeach_error, {EXCLASS, EXCPTION, ST}}
  240. )};
  241. false ->
  242. false
  243. end
  244. end,
  245. CollVal
  246. ).
  247. %% Conditional Clauses such as WHERE, WHEN.
  248. match_conditions({'and', L, R}, Data) ->
  249. match_conditions(L, Data) andalso match_conditions(R, Data);
  250. match_conditions({'or', L, R}, Data) ->
  251. match_conditions(L, Data) orelse match_conditions(R, Data);
  252. match_conditions({'not', Var}, Data) ->
  253. case eval(Var, Data) of
  254. Bool when is_boolean(Bool) ->
  255. not Bool;
  256. _Other ->
  257. false
  258. end;
  259. match_conditions({in, Var, {list, Vals}}, Data) ->
  260. lists:member(eval(Var, Data), [eval(V, Data) || V <- Vals]);
  261. match_conditions({'fun', {_, Name}, Args}, Data) ->
  262. apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data);
  263. match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
  264. compare(Op, eval(L, Data), eval(R, Data));
  265. match_conditions({}, _Data) ->
  266. true.
  267. %% compare to an undefined variable
  268. compare(Op, undefined, undefined) ->
  269. do_compare(Op, undefined, undefined);
  270. compare(_Op, L, R) when L == undefined; R == undefined ->
  271. false;
  272. %% comparing numbers against strings
  273. compare(Op, L, R) when is_number(L), is_binary(R) ->
  274. do_compare(Op, L, number(R));
  275. compare(Op, L, R) when is_binary(L), is_number(R) ->
  276. do_compare(Op, number(L), R);
  277. compare(Op, L, R) when is_atom(L), is_binary(R) ->
  278. do_compare(Op, atom_to_binary(L, utf8), R);
  279. compare(Op, L, R) when is_binary(L), is_atom(R) ->
  280. do_compare(Op, L, atom_to_binary(R, utf8));
  281. compare(Op, L, R) ->
  282. do_compare(Op, L, R).
  283. do_compare('=', L, R) -> L == R;
  284. do_compare('>', L, R) -> L > R;
  285. do_compare('<', L, R) -> L < R;
  286. do_compare('<=', L, R) -> L =< R;
  287. do_compare('>=', L, R) -> L >= R;
  288. do_compare('<>', L, R) -> L /= R;
  289. do_compare('!=', L, R) -> L /= R;
  290. do_compare('=~', T, F) -> emqx_topic:match(T, F).
  291. number(Bin) ->
  292. try
  293. binary_to_integer(Bin)
  294. catch
  295. error:badarg -> binary_to_float(Bin)
  296. end.
  297. handle_action_list(RuleId, Actions, Selected, Envs) ->
  298. [handle_action(RuleId, Act, Selected, Envs) || Act <- Actions].
  299. handle_action(RuleId, ActId, Selected, Envs) ->
  300. ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
  301. try
  302. do_handle_action(RuleId, ActId, Selected, Envs)
  303. catch
  304. throw:out_of_service ->
  305. ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
  306. ok = emqx_metrics_worker:inc(
  307. rule_metrics, RuleId, 'actions.failed.out_of_service'
  308. ),
  309. ?SLOG(warning, #{msg => "out_of_service", action => ActId});
  310. Err:Reason:ST ->
  311. ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
  312. ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'),
  313. ?SLOG(error, #{
  314. msg => "action_failed",
  315. action => ActId,
  316. exception => Err,
  317. reason => Reason,
  318. stacktrace => ST
  319. })
  320. end.
  321. -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found; R == unhealthy_target).
  322. do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
  323. ?TRACE(
  324. "BRIDGE",
  325. "bridge_action",
  326. #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
  327. ),
  328. ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}},
  329. case
  330. emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo})
  331. of
  332. {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
  333. throw(out_of_service);
  334. ?RESOURCE_ERROR_M(R, _) when ?IS_RES_DOWN(R) ->
  335. throw(out_of_service);
  336. Result ->
  337. Result
  338. end;
  339. do_handle_action(
  340. RuleId,
  341. {bridge_v2, BridgeType, BridgeName},
  342. Selected,
  343. _Envs
  344. ) ->
  345. ?TRACE(
  346. "BRIDGE",
  347. "bridge_action",
  348. #{bridge_id => {bridge_v2, BridgeType, BridgeName}}
  349. ),
  350. ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}},
  351. case
  352. emqx_bridge_v2:send_message(
  353. BridgeType,
  354. BridgeName,
  355. Selected,
  356. #{reply_to => ReplyTo}
  357. )
  358. of
  359. {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
  360. throw(out_of_service);
  361. ?RESOURCE_ERROR_M(R, _) when ?IS_RES_DOWN(R) ->
  362. throw(out_of_service);
  363. Result ->
  364. Result
  365. end;
  366. do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) ->
  367. %% the function can also throw 'out_of_service'
  368. Args = maps:get(args, Action, []),
  369. Result = Mod:Func(Selected, Envs, Args),
  370. inc_action_metrics(RuleId, Result),
  371. Result.
  372. eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op == var) ->
  373. case Context of
  374. [Columns] ->
  375. eval(Exp, Columns);
  376. [Columns | Rest] ->
  377. case eval(Exp, Columns) of
  378. undefined -> eval(Exp, Rest);
  379. Val -> Val
  380. end
  381. end;
  382. eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
  383. nested_get({path, Path}, maybe_decode_payload(Payload));
  384. eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
  385. nested_get({path, Path}, maybe_decode_payload(Payload));
  386. eval({path, _} = Path, Columns) ->
  387. nested_get(Path, Columns);
  388. eval({range, {Begin, End}}, _Columns) ->
  389. range_gen(Begin, End);
  390. eval({get_range, {Begin, End}, Data}, Columns) ->
  391. range_get(Begin, End, eval(Data, Columns));
  392. eval({var, _} = Var, Columns) ->
  393. nested_get(Var, Columns);
  394. eval({const, Val}, _Columns) ->
  395. Val;
  396. %% unary add
  397. eval({'+', L}, Columns) ->
  398. eval(L, Columns);
  399. %% unary subtract
  400. eval({'-', L}, Columns) ->
  401. -(eval(L, Columns));
  402. eval({Op, L, R}, Columns) when ?is_arith(Op) ->
  403. apply_func(Op, [eval(L, Columns), eval(R, Columns)], Columns);
  404. eval({Op, L, R}, Columns) when ?is_comp(Op) ->
  405. compare(Op, eval(L, Columns), eval(R, Columns));
  406. eval({list, List}, Columns) ->
  407. [eval(L, Columns) || L <- List];
  408. eval({'case', <<>>, CaseClauses, ElseClauses}, Columns) ->
  409. eval_case_clauses(CaseClauses, ElseClauses, Columns);
  410. eval({'case', CaseOn, CaseClauses, ElseClauses}, Columns) ->
  411. eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Columns);
  412. eval({'fun', {_, Name}, Args}, Columns) ->
  413. apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns).
  414. %% the payload maybe is JSON data, decode it to a `map` first for nested put
  415. ensure_decoded_payload({path, [{key, payload} | _]}, #{payload := Payload} = Columns) ->
  416. Columns#{payload => maybe_decode_payload(Payload)};
  417. ensure_decoded_payload(
  418. {path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Columns
  419. ) ->
  420. Columns#{<<"payload">> => maybe_decode_payload(Payload)};
  421. ensure_decoded_payload(_, Columns) ->
  422. Columns.
  423. alias({var, Var}, _Columns) ->
  424. {var, Var};
  425. alias({const, Val}, _Columns) when is_binary(Val) ->
  426. {var, Val};
  427. alias({list, L}, _Columns) ->
  428. {var, ?ephemeral_alias(list, length(L))};
  429. alias({range, R}, _Columns) ->
  430. {var, ?ephemeral_alias(range, R)};
  431. alias({get_range, _, {var, Key}}, _Columns) ->
  432. {var, Key};
  433. alias({get_range, _, {path, _Path} = Path}, Columns) ->
  434. handle_path_alias(Path, Columns);
  435. alias({path, _Path} = Path, Columns) ->
  436. handle_path_alias(Path, Columns);
  437. alias({const, Val}, _Columns) ->
  438. {var, ?ephemeral_alias(const, Val)};
  439. alias({Op, _L, _R}, _Columns) when ?is_arith(Op); ?is_comp(Op) ->
  440. {var, ?ephemeral_alias(op, Op)};
  441. alias({'case', On, _, _}, _Columns) ->
  442. {var, ?ephemeral_alias('case', On)};
  443. alias({'fun', Name, _}, _Columns) ->
  444. {var, ?ephemeral_alias('fun', Name)};
  445. alias(_, _Columns) ->
  446. ?ephemeral_alias(unknown, unknown).
  447. handle_path_alias({path, [{key, <<"payload">>} | Rest]}, #{payload := _Payload} = _Columns) ->
  448. {path, [{key, payload} | Rest]};
  449. handle_path_alias(Path, _Columns) ->
  450. Path.
  451. eval_case_clauses([], ElseClauses, Columns) ->
  452. case ElseClauses of
  453. {} -> undefined;
  454. _ -> eval(ElseClauses, Columns)
  455. end;
  456. eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Columns) ->
  457. case match_conditions(Cond, Columns) of
  458. true ->
  459. eval(Clause, Columns);
  460. _ ->
  461. eval_case_clauses(CaseClauses, ElseClauses, Columns)
  462. end.
  463. eval_switch_clauses(_CaseOn, [], ElseClauses, Columns) ->
  464. case ElseClauses of
  465. {} -> undefined;
  466. _ -> eval(ElseClauses, Columns)
  467. end;
  468. eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Columns) ->
  469. ConResult = eval(Cond, Columns),
  470. case eval(CaseOn, Columns) of
  471. ConResult ->
  472. eval(Clause, Columns);
  473. _ ->
  474. eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Columns)
  475. end.
  476. apply_func(Name, Args, Columns) when is_binary(Name) ->
  477. FuncName = parse_function_name(?DEFAULT_SQL_FUNC_PROVIDER, Name),
  478. apply_func(FuncName, Args, Columns);
  479. apply_func([{key, ModuleName0}, {key, FuncName0}], Args, Columns) ->
  480. ModuleName = parse_module_name(ModuleName0),
  481. FuncName = parse_function_name(ModuleName, FuncName0),
  482. do_apply_func(ModuleName, FuncName, Args, Columns);
  483. apply_func(Name, Args, Columns) when is_atom(Name) ->
  484. do_apply_func(?DEFAULT_SQL_FUNC_PROVIDER, Name, Args, Columns);
  485. apply_func(Other, _, _) ->
  486. ?RAISE_BAD_SQL(#{
  487. reason => bad_sql_function_reference,
  488. reference => Other
  489. }).
  490. do_apply_func(Module, Name, Args, Columns) ->
  491. try
  492. case erlang:apply(Module, Name, Args) of
  493. Func when is_function(Func) ->
  494. erlang:apply(Func, [Columns]);
  495. Result ->
  496. Result
  497. end
  498. catch
  499. error:function_clause ->
  500. ?RAISE_BAD_SQL(#{
  501. reason => bad_sql_function_argument,
  502. arguments => Args,
  503. function_name => Name
  504. })
  505. end.
  506. add_metadata(Columns, Metadata) when is_map(Columns), is_map(Metadata) ->
  507. NewMetadata = maps:merge(maps:get(metadata, Columns, #{}), Metadata),
  508. Columns#{metadata => NewMetadata}.
  509. %%------------------------------------------------------------------------------
  510. %% Internal Functions
  511. %%------------------------------------------------------------------------------
  512. maybe_decode_payload(Payload) when is_binary(Payload) ->
  513. case get_cached_payload() of
  514. undefined -> safe_decode_and_cache(Payload);
  515. DecodedP -> DecodedP
  516. end;
  517. maybe_decode_payload(Payload) ->
  518. Payload.
  519. get_cached_payload() ->
  520. erlang:get(rule_payload).
  521. cache_payload(DecodedP) ->
  522. erlang:put(rule_payload, DecodedP),
  523. DecodedP.
  524. safe_decode_and_cache(MaybeJson) ->
  525. try
  526. cache_payload(emqx_utils_json:decode(MaybeJson, [return_maps]))
  527. catch
  528. _:_ -> error({decode_json_failed, MaybeJson})
  529. end.
  530. ensure_list(List) when is_list(List) -> List;
  531. ensure_list(_NotList) -> [].
  532. nested_put(Alias, Val, Columns0) ->
  533. Columns = ensure_decoded_payload(Alias, Columns0),
  534. emqx_rule_maps:nested_put(Alias, Val, Columns).
  535. inc_action_metrics(RuleId, Result) ->
  536. _ = do_inc_action_metrics(RuleId, Result),
  537. Result.
  538. do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) ->
  539. emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
  540. do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) ->
  541. emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
  542. emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
  543. do_inc_action_metrics(RuleId, R) ->
  544. case is_ok_result(R) of
  545. false ->
  546. emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
  547. emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
  548. true ->
  549. emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
  550. end.
  551. is_ok_result(ok) ->
  552. true;
  553. is_ok_result({async_return, R}) ->
  554. is_ok_result(R);
  555. is_ok_result(R) when is_tuple(R) ->
  556. ok == erlang:element(1, R);
  557. is_ok_result(_) ->
  558. false.
  559. parse_module_name(Name) when is_binary(Name) ->
  560. case ?IS_VALID_SQL_FUNC_PROVIDER_MODULE_NAME(Name) of
  561. true ->
  562. ok;
  563. false ->
  564. ?RAISE_BAD_SQL(#{
  565. reason => sql_function_provider_module_not_allowed,
  566. module => Name
  567. })
  568. end,
  569. try
  570. parse_module_name(binary_to_existing_atom(Name, utf8))
  571. catch
  572. error:badarg ->
  573. ?RAISE_BAD_SQL(#{
  574. reason => sql_function_provider_module_not_loaded,
  575. module => Name
  576. })
  577. end;
  578. parse_module_name(Name) when is_atom(Name) ->
  579. Name.
  580. parse_function_name(Module, Name) when is_binary(Name) ->
  581. try
  582. parse_function_name(Module, binary_to_existing_atom(Name, utf8))
  583. catch
  584. error:badarg ->
  585. ?RAISE_BAD_SQL(#{
  586. reason => sql_function_not_supported,
  587. module => Module,
  588. function => Name
  589. })
  590. end;
  591. parse_function_name(_Module, Name) when is_atom(Name) ->
  592. Name.