|
|
@@ -39,7 +39,7 @@
|
|
|
|
|
|
%% Internal functions; exported for tests
|
|
|
-export([
|
|
|
- evaluate_sql_check/2
|
|
|
+ evaluate_sql_check/3
|
|
|
]).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -210,11 +210,15 @@ parse_sql_check(SQL) ->
|
|
|
%% Internal functions
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-evaluate_sql_check(Check, Message) ->
|
|
|
+evaluate_sql_check(Check, Validation, Message) ->
|
|
|
#{
|
|
|
fields := Fields,
|
|
|
conditions := Conditions
|
|
|
} = Check,
|
|
|
+ #{
|
|
|
+ name := Name,
|
|
|
+ log_failure_at := FailureLogLevel
|
|
|
+ } = Validation,
|
|
|
{Data, _} = emqx_rule_events:eventmsg_publish(Message),
|
|
|
try emqx_rule_runtime:evaluate_select(Fields, Data, Conditions) of
|
|
|
{ok, _} ->
|
|
|
@@ -222,16 +226,38 @@ evaluate_sql_check(Check, Message) ->
|
|
|
false ->
|
|
|
false
|
|
|
catch
|
|
|
- throw:_Reason ->
|
|
|
- %% TODO: log?
|
|
|
+ throw:Reason ->
|
|
|
+ ?TRACE(
|
|
|
+ FailureLogLevel,
|
|
|
+ ?TRACE_TAG,
|
|
|
+ "validation_sql_check_throw",
|
|
|
+ #{
|
|
|
+ validation => Name,
|
|
|
+ reason => Reason
|
|
|
+ }
|
|
|
+ ),
|
|
|
false;
|
|
|
- _Class:_Error:_Stacktrace ->
|
|
|
- %% TODO: log?
|
|
|
+ Class:Error:Stacktrace ->
|
|
|
+ ?TRACE(
|
|
|
+ FailureLogLevel,
|
|
|
+ ?TRACE_TAG,
|
|
|
+ "validation_sql_check_failure",
|
|
|
+ #{
|
|
|
+ validation => Name,
|
|
|
+ kind => Class,
|
|
|
+ reason => Error,
|
|
|
+ stacktrace => Stacktrace
|
|
|
+ }
|
|
|
+ ),
|
|
|
false
|
|
|
end.
|
|
|
|
|
|
-evaluate_schema_check(Check, #message{payload = Data}) ->
|
|
|
+evaluate_schema_check(Check, Validation, #message{payload = Data}) ->
|
|
|
#{schema := SerdeName} = Check,
|
|
|
+ #{
|
|
|
+ name := Name,
|
|
|
+ log_failure_at := FailureLogLevel
|
|
|
+ } = Validation,
|
|
|
ExtraArgs =
|
|
|
case Check of
|
|
|
#{type := protobuf, message_name := MessageName} ->
|
|
|
@@ -243,9 +269,29 @@ evaluate_schema_check(Check, #message{payload = Data}) ->
|
|
|
emqx_schema_registry_serde:handle_rule_function(schema_check, [SerdeName, Data | ExtraArgs])
|
|
|
catch
|
|
|
error:{serde_not_found, _} ->
|
|
|
+ ?TRACE(
|
|
|
+ FailureLogLevel,
|
|
|
+ ?TRACE_TAG,
|
|
|
+ "validation_schema_check_schema_not_found",
|
|
|
+ #{
|
|
|
+ validation => Name,
|
|
|
+ schema_name => SerdeName
|
|
|
+ }
|
|
|
+ ),
|
|
|
false;
|
|
|
- _Class:_Error:_Stacktrace ->
|
|
|
- %% TODO: log?
|
|
|
+ Class:Error:Stacktrace ->
|
|
|
+ ?TRACE(
|
|
|
+ FailureLogLevel,
|
|
|
+ ?TRACE_TAG,
|
|
|
+ "validation_schema_check_failure",
|
|
|
+ #{
|
|
|
+ validation => Name,
|
|
|
+ schema_name => SerdeName,
|
|
|
+ kind => Class,
|
|
|
+ reason => Error,
|
|
|
+ stacktrace => Stacktrace
|
|
|
+ }
|
|
|
+ ),
|
|
|
false
|
|
|
end.
|
|
|
|
|
|
@@ -356,7 +402,7 @@ run_validation(#{strategy := all_pass} = Validation, Message) ->
|
|
|
failure_action := FailureAction
|
|
|
} = Validation,
|
|
|
Fun = fun(Check, Acc) ->
|
|
|
- case run_check(Check, Message) of
|
|
|
+ case run_check(Check, Validation, Message) of
|
|
|
true -> {cont, Acc};
|
|
|
false -> {halt, FailureAction}
|
|
|
end
|
|
|
@@ -367,14 +413,14 @@ run_validation(#{strategy := any_pass} = Validation, Message) ->
|
|
|
checks := Checks,
|
|
|
failure_action := FailureAction
|
|
|
} = Validation,
|
|
|
- case lists:any(fun(C) -> run_check(C, Message) end, Checks) of
|
|
|
+ case lists:any(fun(C) -> run_check(C, Validation, Message) end, Checks) of
|
|
|
true ->
|
|
|
ok;
|
|
|
false ->
|
|
|
FailureAction
|
|
|
end.
|
|
|
|
|
|
-run_check(#{type := sql} = Check, Message) ->
|
|
|
- evaluate_sql_check(Check, Message);
|
|
|
-run_check(Check, Message) ->
|
|
|
- evaluate_schema_check(Check, Message).
|
|
|
+run_check(#{type := sql} = Check, Validation, Message) ->
|
|
|
+ evaluate_sql_check(Check, Validation, Message);
|
|
|
+run_check(Check, Validation, Message) ->
|
|
|
+ evaluate_schema_check(Check, Validation, Message).
|