| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_rule_runtime).
- -include("rule_engine.hrl").
- -include_lib("emqx/include/emqx.hrl").
- -include_lib("emqx/include/logger.hrl").
- -export([ apply_rule/2
- , apply_rules/2
- , clear_rule_payload/0
- ]).
- -import(emqx_rule_maps,
- [ nested_get/2
- , range_gen/2
- , range_get/3
- ]).
- -type(input() :: map()).
- -type(alias() :: atom()).
- -type(collection() :: {alias(), [term()]}).
- -define(ephemeral_alias(TYPE, NAME),
- iolist_to_binary(io_lib:format("_v_~s_~p_~p", [TYPE, NAME, erlang:system_time()]))).
- -define(ActionMaxRetry, 3).
- %%------------------------------------------------------------------------------
- %% Apply rules
- %%------------------------------------------------------------------------------
- -spec(apply_rules(list(emqx_rule_engine:rule()), input()) -> ok).
- apply_rules([], _Input) ->
- ok;
- apply_rules([#rule{enabled = false}|More], Input) ->
- apply_rules(More, Input);
- apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
- try apply_rule_discard_result(Rule, Input)
- catch
- %% ignore the errors if select or match failed
- _:{select_and_transform_error, Error} ->
- ?LOG(warning, "SELECT clause exception for ~s failed: ~p",
- [RuleID, Error]);
- _:{match_conditions_error, Error} ->
- ?LOG(warning, "WHERE clause exception for ~s failed: ~p",
- [RuleID, Error]);
- _:{select_and_collect_error, Error} ->
- ?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
- [RuleID, Error]);
- _:{match_incase_error, Error} ->
- ?LOG(warning, "INCASE clause exception for ~s failed: ~p",
- [RuleID, Error]);
- _:Error:StkTrace ->
- ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
- [RuleID, Error, StkTrace])
- end,
- apply_rules(More, Input).
- apply_rule_discard_result(Rule, Input) ->
- _ = apply_rule(Rule, Input),
- ok.
- apply_rule(Rule = #rule{id = RuleID}, Input) ->
- clear_rule_payload(),
- do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
- do_apply_rule(#rule{id = RuleId,
- is_foreach = true,
- fields = Fields,
- doeach = DoEach,
- incase = InCase,
- conditions = Conditions,
- on_action_failed = OnFailed,
- actions = Actions}, Input) ->
- {Selected, Collection} = ?RAISE(select_and_collect(Fields, Input),
- {select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
- ColumnsAndSelected = maps:merge(Input, Selected),
- case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
- {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
- true ->
- ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
- Collection2 = filter_collection(Input, InCase, DoEach, Collection),
- {ok, [take_actions(Actions, Coll, Input, OnFailed) || Coll <- Collection2]};
- false ->
- {error, nomatch}
- end;
- do_apply_rule(#rule{id = RuleId,
- is_foreach = false,
- fields = Fields,
- conditions = Conditions,
- on_action_failed = OnFailed,
- actions = Actions}, Input) ->
- Selected = ?RAISE(select_and_transform(Fields, Input),
- {select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
- case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
- {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
- true ->
- ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
- {ok, take_actions(Actions, Selected, Input, OnFailed)};
- false ->
- {error, nomatch}
- end.
- clear_rule_payload() ->
- erlang:erase(rule_payload).
- %% SELECT Clause
- select_and_transform(Fields, Input) ->
- select_and_transform(Fields, Input, #{}).
- select_and_transform([], _Input, Output) ->
- Output;
- select_and_transform(['*'|More], Input, Output) ->
- select_and_transform(More, Input, maps:merge(Output, Input));
- select_and_transform([{as, Field, Alias}|More], Input, Output) ->
- Val = eval(Field, Input),
- select_and_transform(More,
- nested_put(Alias, Val, Input),
- nested_put(Alias, Val, Output));
- select_and_transform([Field|More], Input, Output) ->
- Val = eval(Field, Input),
- Key = alias(Field),
- select_and_transform(More,
- nested_put(Key, Val, Input),
- nested_put(Key, Val, Output)).
- %% FOREACH Clause
- -spec select_and_collect(list(), input()) -> {input(), collection()}.
- select_and_collect(Fields, Input) ->
- select_and_collect(Fields, Input, {#{}, {'item', []}}).
- select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) ->
- Val = eval(Field, Input),
- {nested_put(Alias, Val, Output), {A, ensure_list(Val)}};
- select_and_collect([{as, Field, Alias}|More], Input, {Output, LastKV}) ->
- Val = eval(Field, Input),
- select_and_collect(More,
- nested_put(Alias, Val, Input),
- {nested_put(Alias, Val, Output), LastKV});
- select_and_collect([Field], Input, {Output, _}) ->
- Val = eval(Field, Input),
- Key = alias(Field),
- {nested_put(Key, Val, Output), {'item', ensure_list(Val)}};
- select_and_collect([Field|More], Input, {Output, LastKV}) ->
- Val = eval(Field, Input),
- Key = alias(Field),
- select_and_collect(More,
- nested_put(Key, Val, Input),
- {nested_put(Key, Val, Output), LastKV}).
- %% Filter each item got from FOREACH
- -dialyzer({nowarn_function, filter_collection/4}).
- filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
- lists:filtermap(
- fun(Item) ->
- InputAndItem = maps:merge(Input, #{CollKey => Item}),
- case ?RAISE(match_conditions(InCase, InputAndItem),
- {match_incase_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
- true when DoEach == [] -> {true, InputAndItem};
- true ->
- {true, ?RAISE(select_and_transform(DoEach, InputAndItem),
- {doeach_error, {_EXCLASS_,_EXCPTION_,_ST_}})};
- false -> false
- end
- end, CollVal).
- %% Conditional Clauses such as WHERE, WHEN.
- match_conditions({'and', L, R}, Data) ->
- match_conditions(L, Data) andalso match_conditions(R, Data);
- match_conditions({'or', L, R}, Data) ->
- match_conditions(L, Data) orelse match_conditions(R, Data);
- match_conditions({'not', Var}, Data) ->
- case eval(Var, Data) of
- Bool when is_boolean(Bool) ->
- not Bool;
- _other -> false
- end;
- match_conditions({in, Var, {list, Vals}}, Data) ->
- lists:member(eval(Var, Data), [eval(V, Data) || V <- Vals]);
- match_conditions({'fun', {_, Name}, Args}, Data) ->
- apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data);
- match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
- compare(Op, eval(L, Data), eval(R, Data));
- %%match_conditions({'like', Var, Pattern}, Data) ->
- %% match_like(eval(Var, Data), Pattern);
- match_conditions({}, _Data) ->
- true.
- %% comparing numbers against strings
- compare(Op, L, R) when is_number(L), is_binary(R) ->
- do_compare(Op, L, number(R));
- compare(Op, L, R) when is_binary(L), is_number(R) ->
- do_compare(Op, number(L), R);
- compare(Op, L, R) when is_atom(L), is_binary(R) ->
- do_compare(Op, atom_to_binary(L, utf8), R);
- compare(Op, L, R) when is_binary(L), is_atom(R) ->
- do_compare(Op, L, atom_to_binary(R, utf8));
- compare(Op, L, R) ->
- do_compare(Op, L, R).
- do_compare('=', L, R) -> L == R;
- do_compare('>', L, R) -> L > R;
- do_compare('<', L, R) -> L < R;
- do_compare('<=', L, R) -> L =< R;
- do_compare('>=', L, R) -> L >= R;
- do_compare('<>', L, R) -> L /= R;
- do_compare('!=', L, R) -> L /= R;
- do_compare('=~', T, F) -> emqx_topic:match(T, F).
- number(Bin) ->
- try binary_to_integer(Bin)
- catch error:badarg -> binary_to_float(Bin)
- end.
- %% Step3 -> Take actions
- take_actions(Actions, Selected, Envs, OnFailed) ->
- [take_action(ActInst, Selected, Envs, OnFailed, ?ActionMaxRetry)
- || ActInst <- Actions].
- take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = ActInst,
- Selected, Envs, OnFailed, RetryN) when RetryN >= 0 ->
- try
- {ok, #action_instance_params{apply = Apply}}
- = emqx_rule_registry:get_action_instance_params(Id),
- emqx_rule_metrics:inc_actions_taken(Id),
- apply_action_func(Selected, Envs, Apply, ActName)
- of
- {badact, Reason} ->
- handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, Reason);
- Result -> Result
- catch
- error:{badfun, _Func}:_ST ->
- %?LOG(warning, "Action ~p maybe outdated, refresh it and try again."
- % "Func: ~p~nST:~0p", [Id, Func, ST]),
- _ = trans_action_on(Id, fun() ->
- emqx_rule_engine:refresh_actions([ActInst])
- end, 5000),
- emqx_rule_metrics:inc_actions_retry(Id),
- take_action(ActInst, Selected, Envs, OnFailed, RetryN-1);
- Error:Reason:Stack ->
- emqx_rule_metrics:inc_actions_exception(Id),
- handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {Error, Reason, Stack})
- end;
- take_action(#action_instance{id = Id, fallbacks = Fallbacks}, Selected, Envs, OnFailed, _RetryN) ->
- emqx_rule_metrics:inc_actions_error(Id),
- handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {max_try_reached, ?ActionMaxRetry}).
- apply_action_func(Data, Envs, #{mod := Mod, bindings := Bindings}, Name) ->
- %% TODO: Build the Func Name when creating the action
- Func = cbk_on_action_triggered(Name),
- Mod:Func(Data, Envs#{'__bindings__' => Bindings});
- apply_action_func(Data, Envs, Func, _Name) when is_function(Func) ->
- erlang:apply(Func, [Data, Envs]).
- cbk_on_action_triggered(Name) ->
- list_to_atom("on_action_" ++ atom_to_list(Name)).
- trans_action_on(Id, Callback, Timeout) ->
- case emqx_rule_locker:lock(Id) of
- true -> try Callback() after emqx_rule_locker:unlock(Id) end;
- _ ->
- wait_action_on(Id, Timeout div 10)
- end.
- wait_action_on(_, 0) ->
- {error, timeout};
- wait_action_on(Id, RetryN) ->
- timer:sleep(10),
- case emqx_rule_registry:get_action_instance_params(Id) of
- not_found ->
- {error, not_found};
- {ok, #action_instance_params{apply = Apply}} ->
- case catch apply_action_func(baddata, #{}, Apply, tryit) of
- {'EXIT', {{badfun, _}, _}} ->
- wait_action_on(Id, RetryN-1);
- _ ->
- ok
- end
- end.
- handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) ->
- ?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]),
- _ = take_actions(Fallbacks, Selected, Envs, continue),
- failed;
- handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) ->
- ?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]),
- _ = take_actions(Fallbacks, Selected, Envs, continue),
- error({take_action_failed, {Id, Reason}}).
- eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
- nested_get({path, Path}, may_decode_payload(Payload));
- eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
- nested_get({path, Path}, may_decode_payload(Payload));
- eval({path, _} = Path, Input) ->
- nested_get(Path, Input);
- eval({range, {Begin, End}}, _Input) ->
- range_gen(Begin, End);
- eval({get_range, {Begin, End}, Data}, Input) ->
- range_get(Begin, End, eval(Data, Input));
- eval({var, _} = Var, Input) ->
- nested_get(Var, Input);
- eval({const, Val}, _Input) ->
- Val;
- %% unary add
- eval({'+', L}, Input) ->
- eval(L, Input);
- %% unary subtract
- eval({'-', L}, Input) ->
- -(eval(L, Input));
- eval({Op, L, R}, Input) when ?is_arith(Op) ->
- apply_func(Op, [eval(L, Input), eval(R, Input)], Input);
- eval({Op, L, R}, Input) when ?is_comp(Op) ->
- compare(Op, eval(L, Input), eval(R, Input));
- eval({list, List}, Input) ->
- [eval(L, Input) || L <- List];
- eval({'case', <<>>, CaseClauses, ElseClauses}, Input) ->
- eval_case_clauses(CaseClauses, ElseClauses, Input);
- eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) ->
- eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input);
- eval({'fun', {_, Name}, Args}, Input) ->
- apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input).
- handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Input) ->
- Input#{payload => may_decode_payload(Payload)};
- handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Input) ->
- Input#{<<"payload">> => may_decode_payload(Payload)};
- handle_alias(_, Input) ->
- Input.
- alias({var, Var}) ->
- {var, Var};
- alias({const, Val}) when is_binary(Val) ->
- {var, Val};
- alias({list, L}) ->
- {var, ?ephemeral_alias(list, length(L))};
- alias({range, R}) ->
- {var, ?ephemeral_alias(range, R)};
- alias({get_range, _, {var, Key}}) ->
- {var, Key};
- alias({get_range, _, {path, Path}}) ->
- {path, Path};
- alias({path, Path}) ->
- {path, Path};
- alias({const, Val}) ->
- {var, ?ephemeral_alias(const, Val)};
- alias({Op, _L, _R}) when ?is_arith(Op); ?is_comp(Op) ->
- {var, ?ephemeral_alias(op, Op)};
- alias({'case', On, _, _}) ->
- {var, ?ephemeral_alias('case', On)};
- alias({'fun', Name, _}) ->
- {var, ?ephemeral_alias('fun', Name)};
- alias(_) ->
- ?ephemeral_alias(unknown, unknown).
- eval_case_clauses([], ElseClauses, Input) ->
- case ElseClauses of
- {} -> undefined;
- _ -> eval(ElseClauses, Input)
- end;
- eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
- case match_conditions(Cond, Input) of
- true ->
- eval(Clause, Input);
- _ ->
- eval_case_clauses(CaseClauses, ElseClauses, Input)
- end.
- eval_switch_clauses(_CaseOn, [], ElseClauses, Input) ->
- case ElseClauses of
- {} -> undefined;
- _ -> eval(ElseClauses, Input)
- end;
- eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
- ConResult = eval(Cond, Input),
- case eval(CaseOn, Input) of
- ConResult ->
- eval(Clause, Input);
- _ ->
- eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input)
- end.
- apply_func(Name, Args, Input) when is_atom(Name) ->
- do_apply_func(Name, Args, Input);
- apply_func(Name, Args, Input) when is_binary(Name) ->
- FunName =
- try binary_to_existing_atom(Name, utf8)
- catch error:badarg -> error({sql_function_not_supported, Name})
- end,
- do_apply_func(FunName, Args, Input).
- do_apply_func(Name, Args, Input) ->
- case erlang:apply(emqx_rule_funcs, Name, Args) of
- Func when is_function(Func) ->
- erlang:apply(Func, [Input]);
- Result -> Result
- end.
- add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) ->
- NewMetadata = maps:merge(maps:get(metadata, Input, #{}), Metadata),
- Input#{metadata => NewMetadata}.
- %%------------------------------------------------------------------------------
- %% Internal Functions
- %%------------------------------------------------------------------------------
- may_decode_payload(Payload) when is_binary(Payload) ->
- case get_cached_payload() of
- undefined -> safe_decode_and_cache(Payload);
- DecodedP -> DecodedP
- end;
- may_decode_payload(Payload) ->
- Payload.
- get_cached_payload() ->
- erlang:get(rule_payload).
- cache_payload(DecodedP) ->
- erlang:put(rule_payload, DecodedP),
- DecodedP.
- safe_decode_and_cache(MaybeJson) ->
- try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
- catch _:_ -> #{}
- end.
- ensure_list(List) when is_list(List) -> List;
- ensure_list(_NotList) -> [].
- nested_put(Alias, Val, Input0) ->
- Input = handle_alias(Alias, Input0),
- emqx_rule_maps:nested_put(Alias, Val, Input).
|