|
|
@@ -3,6 +3,7 @@
|
|
|
%%--------------------------------------------------------------------
|
|
|
-module(emqx_message_validation).
|
|
|
|
|
|
+-include_lib("snabbkaffe/include/trace.hrl").
|
|
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
|
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
@@ -214,10 +215,7 @@ evaluate_sql_check(Check, Validation, Message) ->
|
|
|
fields := Fields,
|
|
|
conditions := Conditions
|
|
|
} = Check,
|
|
|
- #{
|
|
|
- name := Name,
|
|
|
- log_failure := #{level := FailureLogLevel}
|
|
|
- } = Validation,
|
|
|
+ #{name := Name} = Validation,
|
|
|
{Data, _} = emqx_rule_events:eventmsg_publish(Message),
|
|
|
try emqx_rule_runtime:evaluate_select(Fields, Data, Conditions) of
|
|
|
{ok, _} ->
|
|
|
@@ -226,37 +224,24 @@ evaluate_sql_check(Check, Validation, Message) ->
|
|
|
false
|
|
|
catch
|
|
|
throw:Reason ->
|
|
|
- ?TRACE(
|
|
|
- FailureLogLevel,
|
|
|
- ?TRACE_TAG,
|
|
|
- "validation_sql_check_throw",
|
|
|
- #{
|
|
|
- validation => Name,
|
|
|
- reason => Reason
|
|
|
- }
|
|
|
- ),
|
|
|
+ trace_failure(Validation, "validation_sql_check_throw", #{
|
|
|
+ validation => Name,
|
|
|
+ reason => Reason
|
|
|
+ }),
|
|
|
false;
|
|
|
Class:Error:Stacktrace ->
|
|
|
- ?TRACE(
|
|
|
- FailureLogLevel,
|
|
|
- ?TRACE_TAG,
|
|
|
- "validation_sql_check_failure",
|
|
|
- #{
|
|
|
- validation => Name,
|
|
|
- kind => Class,
|
|
|
- reason => Error,
|
|
|
- stacktrace => Stacktrace
|
|
|
- }
|
|
|
- ),
|
|
|
+ trace_failure(Validation, "validation_sql_check_failure", #{
|
|
|
+ validation => Name,
|
|
|
+ kind => Class,
|
|
|
+ reason => Error,
|
|
|
+ stacktrace => Stacktrace
|
|
|
+ }),
|
|
|
false
|
|
|
end.
|
|
|
|
|
|
evaluate_schema_check(Check, Validation, #message{payload = Data}) ->
|
|
|
#{schema := SerdeName} = Check,
|
|
|
- #{
|
|
|
- name := Name,
|
|
|
- log_failure := #{level := FailureLogLevel}
|
|
|
- } = Validation,
|
|
|
+ #{name := Name} = Validation,
|
|
|
ExtraArgs =
|
|
|
case Check of
|
|
|
#{type := protobuf, message_name := MessageName} ->
|
|
|
@@ -268,29 +253,19 @@ evaluate_schema_check(Check, Validation, #message{payload = Data}) ->
|
|
|
emqx_schema_registry_serde:schema_check(SerdeName, Data, ExtraArgs)
|
|
|
catch
|
|
|
error:{serde_not_found, _} ->
|
|
|
- ?TRACE(
|
|
|
- FailureLogLevel,
|
|
|
- ?TRACE_TAG,
|
|
|
- "validation_schema_check_schema_not_found",
|
|
|
- #{
|
|
|
- validation => Name,
|
|
|
- schema_name => SerdeName
|
|
|
- }
|
|
|
- ),
|
|
|
+ trace_failure(Validation, "validation_schema_check_schema_not_found", #{
|
|
|
+ validation => Name,
|
|
|
+ schema_name => SerdeName
|
|
|
+ }),
|
|
|
false;
|
|
|
Class:Error:Stacktrace ->
|
|
|
- ?TRACE(
|
|
|
- FailureLogLevel,
|
|
|
- ?TRACE_TAG,
|
|
|
- "validation_schema_check_failure",
|
|
|
- #{
|
|
|
- validation => Name,
|
|
|
- schema_name => SerdeName,
|
|
|
- kind => Class,
|
|
|
- reason => Error,
|
|
|
- stacktrace => Stacktrace
|
|
|
- }
|
|
|
- ),
|
|
|
+ trace_failure(Validation, "validation_schema_check_failure", #{
|
|
|
+ validation => Name,
|
|
|
+ schema_name => SerdeName,
|
|
|
+ kind => Class,
|
|
|
+ reason => Error,
|
|
|
+ stacktrace => Stacktrace
|
|
|
+ }),
|
|
|
false
|
|
|
end.
|
|
|
|
|
|
@@ -404,20 +379,15 @@ run_validations(Validations, Message) ->
|
|
|
try
|
|
|
emqx_rule_runtime:clear_rule_payload(),
|
|
|
Fun = fun(Validation, Acc) ->
|
|
|
- #{
|
|
|
- name := Name,
|
|
|
- log_failure := #{level := FailureLogLevel}
|
|
|
- } = Validation,
|
|
|
+ #{name := Name} = Validation,
|
|
|
case run_validation(Validation, Message) of
|
|
|
ok ->
|
|
|
{cont, Acc};
|
|
|
FailureAction ->
|
|
|
- ?TRACE(
|
|
|
- FailureLogLevel,
|
|
|
- ?TRACE_TAG,
|
|
|
- "validation_failed",
|
|
|
- #{validation => Name, action => FailureAction}
|
|
|
- ),
|
|
|
+ trace_failure(Validation, "validation_failed", #{
|
|
|
+ validation => Name,
|
|
|
+ action => FailureAction
|
|
|
+ }),
|
|
|
{halt, FailureAction}
|
|
|
end
|
|
|
end,
|
|
|
@@ -454,3 +424,18 @@ run_check(#{type := sql} = Check, Validation, Message) ->
|
|
|
evaluate_sql_check(Check, Validation, Message);
|
|
|
run_check(Check, Validation, Message) ->
|
|
|
evaluate_schema_check(Check, Validation, Message).
|
|
|
+
|
|
|
+trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) ->
|
|
|
+ #{
|
|
|
+ name := _Name,
|
|
|
+ failure_action := _Action
|
|
|
+ } = Validation,
|
|
|
+ ?tp(message_validation_failure, #{log_level => none, name => _Name, action => _Action}),
|
|
|
+ ok;
|
|
|
+trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) ->
|
|
|
+ #{
|
|
|
+ name := _Name,
|
|
|
+ failure_action := _Action
|
|
|
+ } = Validation,
|
|
|
+ ?tp(message_validation_failure, #{log_level => Level, name => _Name, action => _Action}),
|
|
|
+ ?TRACE(Level, ?TRACE_TAG, Msg, Meta).
|