| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%--------------------------------------------------------------------
- -module(emqx_message_validation_tests).
- -include_lib("eunit/include/eunit.hrl").
- -define(VALIDATIONS_PATH, "message_validation.validations").
- %%------------------------------------------------------------------------------
- %% Helper fns
- %%------------------------------------------------------------------------------
- parse_and_check(InnerConfigs) ->
- RootBin = <<"message_validation">>,
- InnerBin = <<"validations">>,
- RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
- #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
- emqx_message_validation_schema,
- RawConf,
- #{
- required => false,
- atom_key => false,
- make_serializable => false
- }
- ),
- Checked.
- validation(Name, Checks) ->
- validation(Name, Checks, _Overrides = #{}).
- validation(Name, Checks, Overrides) ->
- Default = #{
- <<"tags">> => [<<"some">>, <<"tags">>],
- <<"description">> => <<"my validation">>,
- <<"enable">> => true,
- <<"name">> => Name,
- <<"topics">> => <<"t/+">>,
- <<"strategy">> => <<"all_pass">>,
- <<"failure_action">> => <<"drop">>,
- <<"log_failure">> => #{<<"level">> => <<"warning">>},
- <<"checks">> => Checks
- },
- emqx_utils_maps:deep_merge(Default, Overrides).
- sql_check() ->
- sql_check(<<"select * where true">>).
- sql_check(SQL) ->
- #{
- <<"type">> => <<"sql">>,
- <<"sql">> => SQL
- }.
- schema_check(Type, SerdeName) ->
- schema_check(Type, SerdeName, _Overrides = #{}).
- schema_check(Type, SerdeName, Overrides) ->
- emqx_utils_maps:deep_merge(
- #{
- <<"type">> => emqx_utils_conv:bin(Type),
- <<"schema">> => SerdeName
- },
- Overrides
- ).
- eval_sql(Message, SQL) ->
- {ok, Check} = emqx_message_validation:parse_sql_check(SQL),
- Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
- emqx_message_validation:evaluate_sql_check(Check, Validation, Message).
- message() ->
- message(_Opts = #{}).
- message(Opts) ->
- Defaults = #{
- id => emqx_guid:gen(),
- qos => 0,
- from => emqx_guid:to_hexstr(emqx_guid:gen()),
- flags => #{retain => false},
- headers => #{
- proto_ver => v5,
- properties => #{'User-Property' => [{<<"a">>, <<"b">>}]}
- },
- topic => <<"t/t">>,
- payload => emqx_utils_json:encode(#{value => 10}),
- timestamp => 1710272561615,
- extra => []
- },
- emqx_message:from_map(emqx_utils_maps:deep_merge(Defaults, Opts)).
- %%------------------------------------------------------------------------------
- %% Test cases
- %%------------------------------------------------------------------------------
- schema_test_() ->
- [
- {"topics is always a list 1",
- ?_assertMatch(
- [#{<<"topics">> := [<<"t/1">>]}],
- parse_and_check([
- validation(
- <<"foo">>,
- [sql_check()],
- #{<<"topics">> => <<"t/1">>}
- )
- ])
- )},
- {"topics is always a list 2",
- ?_assertMatch(
- [#{<<"topics">> := [<<"t/1">>]}],
- parse_and_check([
- validation(
- <<"foo">>,
- [sql_check()],
- #{<<"topics">> => [<<"t/1">>]}
- )
- ])
- )},
- {"foreach expression is not allowed",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := foreach_not_allowed,
- kind := validation_error
- }
- ]},
- parse_and_check([
- validation(
- <<"foo">>,
- [sql_check(<<"foreach foo as f where true">>)]
- )
- ])
- )},
- {"from clause is not allowed",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := non_empty_from_clause,
- kind := validation_error
- }
- ]},
- parse_and_check([
- validation(
- <<"foo">>,
- [sql_check(<<"select * from t">>)]
- )
- ])
- )},
- {"names are unique",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := <<"duplicated name:", _/binary>>,
- path := ?VALIDATIONS_PATH,
- kind := validation_error
- }
- ]},
- parse_and_check([
- validation(<<"foo">>, [sql_check()]),
- validation(<<"foo">>, [sql_check()])
- ])
- )},
- {"checks must be non-empty",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := "at least one check must be defined",
- kind := validation_error
- }
- ]},
- parse_and_check([
- validation(
- <<"foo">>,
- []
- )
- ])
- )},
- {"bogus check type",
- ?_assertThrow(
- {_Schema, [
- #{
- expected := <<"sql", _/binary>>,
- kind := validation_error,
- field_name := type
- }
- ]},
- parse_and_check([validation(<<"foo">>, [#{<<"type">> => <<"foo">>}])])
- )}
- ].
- invalid_names_test_() ->
- [
- {InvalidName,
- ?_assertThrow(
- {_Schema, [
- #{
- reason := <<"must conform to regex:", _/binary>>,
- kind := validation_error,
- path := "message_validation.validations.1.name"
- }
- ]},
- parse_and_check([validation(InvalidName, [sql_check()])])
- )}
- || InvalidName <- [
- <<"">>,
- <<"_name">>,
- <<"name$">>,
- <<"name!">>,
- <<"some name">>,
- <<"nãme"/utf8>>,
- <<"test_哈哈"/utf8>>
- ]
- ].
- check_test_() ->
- [
- {"denied by payload 1",
- ?_assertNot(eval_sql(message(), <<"select * where payload.value > 15">>))},
- {"denied by payload 2",
- ?_assertNot(eval_sql(message(), <<"select payload.value as x where x > 15">>))},
- {"allowed by payload 1",
- ?_assert(eval_sql(message(), <<"select * where payload.value > 5">>))},
- {"allowed by payload 2",
- ?_assert(eval_sql(message(), <<"select payload.value as x where x > 5">>))},
- {"always passes 1", ?_assert(eval_sql(message(), <<"select * where true">>))},
- {"always passes 2", ?_assert(eval_sql(message(), <<"select * where 1 = 1">>))},
- {"never passes 1", ?_assertNot(eval_sql(message(), <<"select * where false">>))},
- {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
- {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
- ].
- duplicated_check_test_() ->
- [
- {"duplicated topics 1",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := <<"duplicated topics: t/1">>,
- kind := validation_error,
- path := "message_validation.validations.1.topics"
- }
- ]},
- parse_and_check([
- validation(
- <<"foo">>,
- [schema_check(json, <<"a">>)],
- #{<<"topics">> => [<<"t/1">>, <<"t/1">>]}
- )
- ])
- )},
- {"duplicated topics 2",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := <<"duplicated topics: t/1">>,
- kind := validation_error,
- path := "message_validation.validations.1.topics"
- }
- ]},
- parse_and_check([
- validation(
- <<"foo">>,
- [schema_check(json, <<"a">>)],
- #{<<"topics">> => [<<"t/1">>, <<"t/#">>, <<"t/1">>]}
- )
- ])
- )},
- {"duplicated topics 3",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := <<"duplicated topics: t/1, t/2">>,
- kind := validation_error,
- path := "message_validation.validations.1.topics"
- }
- ]},
- parse_and_check([
- validation(
- <<"foo">>,
- [schema_check(json, <<"a">>)],
- #{
- <<"topics">> => [
- <<"t/1">>,
- <<"t/#">>,
- <<"t/1">>,
- <<"t/2">>,
- <<"t/2">>
- ]
- }
- )
- ])
- )},
- {"duplicated sql checks are not checked",
- ?_assertMatch(
- [#{<<"checks">> := [_, _]}],
- parse_and_check([
- validation(<<"foo">>, [sql_check(), sql_check()])
- ])
- )},
- {"different serdes with same name",
- ?_assertMatch(
- [#{<<"checks">> := [_, _, _]}],
- parse_and_check([
- validation(<<"foo">>, [
- schema_check(json, <<"a">>),
- schema_check(avro, <<"a">>),
- schema_check(
- protobuf,
- <<"a">>,
- #{<<"message_name">> => <<"a">>}
- )
- ])
- ])
- )},
- {"duplicated serdes 1",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := <<"duplicated schema checks: json:a">>,
- kind := validation_error,
- path := "message_validation.validations.1.checks"
- }
- ]},
- parse_and_check([
- validation(<<"foo">>, [
- schema_check(json, <<"a">>),
- schema_check(json, <<"a">>)
- ])
- ])
- )},
- {"duplicated serdes 2",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := <<"duplicated schema checks: json:a">>,
- kind := validation_error,
- path := "message_validation.validations.1.checks"
- }
- ]},
- parse_and_check([
- validation(<<"foo">>, [
- schema_check(json, <<"a">>),
- sql_check(),
- schema_check(json, <<"a">>)
- ])
- ])
- )},
- {"duplicated serdes 3",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := <<"duplicated schema checks: json:a">>,
- kind := validation_error,
- path := "message_validation.validations.1.checks"
- }
- ]},
- parse_and_check([
- validation(<<"foo">>, [
- schema_check(json, <<"a">>),
- schema_check(json, <<"a">>),
- sql_check()
- ])
- ])
- )},
- {"duplicated serdes 4",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := <<"duplicated schema checks: json:a">>,
- kind := validation_error,
- path := "message_validation.validations.1.checks"
- }
- ]},
- parse_and_check([
- validation(<<"foo">>, [
- schema_check(json, <<"a">>),
- schema_check(json, <<"a">>),
- schema_check(json, <<"a">>)
- ])
- ])
- )},
- {"duplicated serdes 4",
- ?_assertThrow(
- {_Schema, [
- #{
- reason := <<"duplicated schema checks: ", _/binary>>,
- kind := validation_error,
- path := "message_validation.validations.1.checks"
- }
- ]},
- parse_and_check([
- validation(<<"foo">>, [
- schema_check(json, <<"a">>),
- schema_check(json, <<"a">>),
- schema_check(avro, <<"b">>),
- schema_check(avro, <<"b">>)
- ])
- ])
- )}
- ].
|