|
|
@@ -50,26 +50,40 @@ apply_rules([], _Input) ->
|
|
|
ok;
|
|
|
apply_rules([#{enable := false}|More], Input) ->
|
|
|
apply_rules(More, Input);
|
|
|
-apply_rules([Rule = #{id := RuleID}|More], Input) ->
|
|
|
- try apply_rule_discard_result(Rule, Input)
|
|
|
+apply_rules([Rule|More], Input) ->
|
|
|
+ apply_rule_discard_result(Rule, Input),
|
|
|
+ apply_rules(More, Input).
|
|
|
+
|
|
|
+apply_rule_discard_result(Rule, Input) ->
|
|
|
+ _ = apply_rule(Rule, Input),
|
|
|
+ ok.
|
|
|
+
|
|
|
+apply_rule(Rule = #{id := RuleID}, Input) ->
|
|
|
+ ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'),
|
|
|
+ clear_rule_payload(),
|
|
|
+ try do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID}))
|
|
|
catch
|
|
|
%% ignore the errors if select or match failed
|
|
|
- _:{select_and_transform_error, Error} ->
|
|
|
+ _:Reason = {select_and_transform_error, Error} ->
|
|
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
|
|
?SLOG(warning, #{msg => "SELECT_clause_exception",
|
|
|
- rule_id => RuleID, reason => Error});
|
|
|
- _:{match_conditions_error, Error} ->
|
|
|
+ rule_id => RuleID, reason => Error}),
|
|
|
+ {error, Reason};
|
|
|
+ _:Reason = {match_conditions_error, Error} ->
|
|
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
|
|
?SLOG(warning, #{msg => "WHERE_clause_exception",
|
|
|
- rule_id => RuleID, reason => Error});
|
|
|
- _:{select_and_collect_error, Error} ->
|
|
|
+ rule_id => RuleID, reason => Error}),
|
|
|
+ {error, Reason};
|
|
|
+ _:Reason = {select_and_collect_error, Error} ->
|
|
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
|
|
?SLOG(warning, #{msg => "FOREACH_clause_exception",
|
|
|
- rule_id => RuleID, reason => Error});
|
|
|
- _:{match_incase_error, Error} ->
|
|
|
+ rule_id => RuleID, reason => Error}),
|
|
|
+ {error, Reason};
|
|
|
+ _:Reason = {match_incase_error, Error} ->
|
|
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
|
|
?SLOG(warning, #{msg => "INCASE_clause_exception",
|
|
|
- rule_id => RuleID, reason => Error});
|
|
|
+ rule_id => RuleID, reason => Error}),
|
|
|
+ {error, Reason};
|
|
|
Class:Error:StkTrace ->
|
|
|
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
|
|
?SLOG(error, #{msg => "apply_rule_failed",
|
|
|
@@ -77,18 +91,9 @@ apply_rules([Rule = #{id := RuleID}|More], Input) ->
|
|
|
exception => Class,
|
|
|
reason => Error,
|
|
|
stacktrace => StkTrace
|
|
|
- })
|
|
|
- end,
|
|
|
- apply_rules(More, Input).
|
|
|
-
|
|
|
-apply_rule_discard_result(Rule, Input) ->
|
|
|
- _ = apply_rule(Rule, Input),
|
|
|
- ok.
|
|
|
-
|
|
|
-apply_rule(Rule = #{id := RuleID}, Input) ->
|
|
|
- ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'),
|
|
|
- clear_rule_payload(),
|
|
|
- do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
|
|
|
+ }),
|
|
|
+ {error, {Error, StkTrace}}
|
|
|
+ end.
|
|
|
|
|
|
do_apply_rule(#{
|
|
|
id := RuleId,
|
|
|
@@ -413,7 +418,7 @@ cache_payload(DecodedP) ->
|
|
|
|
|
|
safe_decode_and_cache(MaybeJson) ->
|
|
|
try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
|
|
|
- catch _:_ -> #{}
|
|
|
+ catch _:_ -> error({decode_json_failed, MaybeJson})
|
|
|
end.
|
|
|
|
|
|
ensure_list(List) when is_list(List) -> List;
|