Просмотр исходного кода

feat(schema validation): check references schema names and types before changing config

Fixes https://emqx.atlassian.net/browse/EMQX-12368
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
0f9c3b4cea

+ 1 - 1
apps/emqx_schema_registry/include/emqx_schema_registry.hrl

@@ -26,7 +26,7 @@
 -type encoded_data() :: iodata().
 -type decoded_data() :: map().
 
--type serde_type() :: emqx_schema_registry_serde:serde_type().
+-type serde_type() :: avro | protobuf | json.
 -type serde_opts() :: map().
 
 -record(serde, {

+ 11 - 3
apps/emqx_schema_validation/src/emqx_schema_validation.erl

@@ -3,6 +3,8 @@
 %%--------------------------------------------------------------------
 -module(emqx_schema_validation).
 
+-feature(maybe_expr, enable).
+
 -include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("emqx_utils/include/emqx_message.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
@@ -41,7 +43,13 @@
 -define(TRACE_TAG, "SCHEMA_VALIDATION").
 
 -type validation_name() :: binary().
--type validation() :: _TODO.
+-type raw_validation() :: #{binary() => _}.
+-type validation() :: #{
+    name := validation_name(),
+    strategy := all_pass | any_pass,
+    failure_action := drop | disconnect | ignore,
+    log_failure := #{level := error | warning | notice | info | debug | none}
+}.
 
 -export_type([
     validation/0,
@@ -65,12 +73,12 @@ reorder(Order) ->
 lookup(Name) ->
     emqx_schema_validation_config:lookup(Name).
 
--spec insert(validation()) ->
+-spec insert(raw_validation()) ->
     {ok, _} | {error, _}.
 insert(Validation) ->
     emqx_schema_validation_config:insert(Validation).
 
--spec update(validation()) ->
+-spec update(raw_validation()) ->
     {ok, _} | {error, _}.
 update(Validation) ->
     emqx_schema_validation_config:update(Validation).

+ 133 - 42
apps/emqx_schema_validation/src/emqx_schema_validation_config.erl

@@ -3,6 +3,8 @@
 %%--------------------------------------------------------------------
 -module(emqx_schema_validation_config).
 
+-feature(maybe_expr, enable).
+
 %% API
 -export([
     add_handler/0,
@@ -136,15 +138,25 @@ pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) ->
 pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) ->
     {ok, NewConfig}.
 
-post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
-    {Pos, Validation} = fetch_with_index(New, Name),
-    ok = emqx_schema_validation_registry:insert(Pos, Validation),
-    ok;
-post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) ->
-    {_Pos, OldValidation} = fetch_with_index(Old, Name),
-    {Pos, NewValidation} = fetch_with_index(New, Name),
-    ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
-    ok;
+post_config_update(
+    ?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name} = RawValidation}, New, _Old, _AppEnvs
+) ->
+    maybe
+        ok ?= assert_referenced_schemas_exist(RawValidation),
+        {Pos, Validation} = fetch_with_index(New, Name),
+        ok = emqx_schema_validation_registry:insert(Pos, Validation),
+        ok
+    end;
+post_config_update(
+    ?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name} = RawValidation}, New, Old, _AppEnvs
+) ->
+    maybe
+        ok ?= assert_referenced_schemas_exist(RawValidation),
+        {_Pos, OldValidation} = fetch_with_index(Old, Name),
+        {Pos, NewValidation} = fetch_with_index(New, Name),
+        ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
+        ok
+    end;
 post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
     {Pos, Validation} = fetch_with_index(Old, Name),
     ok = emqx_schema_validation_registry:delete(Validation, Pos),
@@ -161,16 +173,19 @@ post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) ->
             OldValidations,
             fun(#{name := N}) -> N end
         ),
-    NewValidations =
-        lists:map(
-            fun(#{name := Name}) ->
-                {Pos, Validation} = fetch_with_index(ResultingValidations, Name),
-                ok = emqx_schema_validation_registry:insert(Pos, Validation),
-                #{name => Name, pos => Pos}
-            end,
-            NewValidations0
-        ),
-    {ok, #{new_validations => NewValidations}};
+    maybe
+        ok ?= multi_assert_referenced_schemas_exist(NewValidations0),
+        NewValidations =
+            lists:map(
+                fun(#{name := Name}) ->
+                    {Pos, Validation} = fetch_with_index(ResultingValidations, Name),
+                    ok = emqx_schema_validation_registry:insert(Pos, Validation),
+                    #{name => Name, pos => Pos}
+                end,
+                NewValidations0
+            ),
+        {ok, #{new_validations => NewValidations}}
+    end;
 post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) ->
     #{
         new_validations := NewValidations,
@@ -179,32 +194,46 @@ post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnv
     } = prepare_config_replace(Input, Old),
     #{validations := ResultingValidations} = ResultingConfig,
     #{validations := OldValidations} = Old,
-    lists:foreach(
-        fun(Name) ->
-            {Pos, Validation} = fetch_with_index(OldValidations, Name),
-            ok = emqx_schema_validation_registry:delete(Validation, Pos)
-        end,
-        DeletedValidations
-    ),
-    lists:foreach(
-        fun(Name) ->
-            {Pos, Validation} = fetch_with_index(ResultingValidations, Name),
-            ok = emqx_schema_validation_registry:insert(Pos, Validation)
-        end,
-        NewValidations
-    ),
-    ChangedValidations =
-        lists:map(
+    NewOrChangedValidationNames = NewValidations ++ ChangedValidations0,
+    maybe
+        ok ?=
+            multi_assert_referenced_schemas_exist(
+                lists:filter(
+                    fun(#{name := N}) ->
+                        lists:member(N, NewOrChangedValidationNames)
+                    end,
+                    ResultingValidations
+                )
+            ),
+        lists:foreach(
             fun(Name) ->
-                {_Pos, OldValidation} = fetch_with_index(OldValidations, Name),
-                {Pos, NewValidation} = fetch_with_index(ResultingValidations, Name),
-                ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
-                #{name => Name, pos => Pos}
+                {Pos, Validation} = fetch_with_index(OldValidations, Name),
+                ok = emqx_schema_validation_registry:delete(Validation, Pos)
             end,
-            ChangedValidations0
+            DeletedValidations
         ),
-    ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations, OldValidations),
-    {ok, #{changed_validations => ChangedValidations}}.
+        lists:foreach(
+            fun(Name) ->
+                {Pos, Validation} = fetch_with_index(ResultingValidations, Name),
+                ok = emqx_schema_validation_registry:insert(Pos, Validation)
+            end,
+            NewValidations
+        ),
+        ChangedValidations =
+            lists:map(
+                fun(Name) ->
+                    {_Pos, OldValidation} = fetch_with_index(OldValidations, Name),
+                    {Pos, NewValidation} = fetch_with_index(ResultingValidations, Name),
+                    ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
+                    #{name => Name, pos => Pos}
+                end,
+                ChangedValidations0
+            ),
+        ok = emqx_schema_validation_registry:reindex_positions(
+            ResultingValidations, OldValidations
+        ),
+        {ok, #{changed_validations => ChangedValidations}}
+    end.
 
 %%------------------------------------------------------------------------------
 %% `emqx_config_backup' API
@@ -388,3 +417,65 @@ prepare_config_replace(NewConfig, OldConfig) ->
         changed_validations => ChangedValidations0 ++ ChangedValidations1,
         deleted_validations => DeletedValidations
     }.
+
+-spec assert_referenced_schemas_exist(raw_validation()) -> ok | {error, map()}.
+assert_referenced_schemas_exist(RawValidation) ->
+    #{<<"checks">> := RawChecks} = RawValidation,
+    SchemasToCheck =
+        lists:filtermap(
+            fun
+                (#{<<"schema">> := SchemaName} = Check) ->
+                    %% so far, only protobuf has inner types
+                    InnerPath =
+                        case maps:find(<<"message_type">>, Check) of
+                            {ok, MessageType} -> [MessageType];
+                            error -> []
+                        end,
+                    {true, {SchemaName, InnerPath}};
+                (_Check) ->
+                    false
+            end,
+            RawChecks
+        ),
+    do_assert_referenced_schemas_exist(SchemasToCheck).
+
+do_assert_referenced_schemas_exist(SchemasToCheck) ->
+    MissingSchemas =
+        lists:foldl(
+            fun({SchemaName, InnerPath}, Acc) ->
+                case emqx_schema_registry:is_existing_type(SchemaName, InnerPath) of
+                    true ->
+                        Acc;
+                    false ->
+                        [[SchemaName | InnerPath] | Acc]
+                end
+            end,
+            [],
+            SchemasToCheck
+        ),
+    case MissingSchemas of
+        [] ->
+            ok;
+        [_ | _] ->
+            {error, #{missing_schemas => MissingSchemas}}
+    end.
+
+-spec multi_assert_referenced_schemas_exist([validation()]) -> ok | {error, map()}.
+multi_assert_referenced_schemas_exist(Validations) ->
+    SchemasToCheck =
+        lists:filtermap(
+            fun
+                (#{schema := SchemaName} = Check) ->
+                    %% so far, only protobuf has inner types
+                    InnerPath =
+                        case maps:find(message_type, Check) of
+                            {ok, MessageType} -> [MessageType];
+                            error -> []
+                        end,
+                    {true, {SchemaName, InnerPath}};
+                (_Check) ->
+                    false
+            end,
+            [Check || #{checks := Checks} <- Validations, Check <- Checks]
+        ),
+    do_assert_referenced_schemas_exist(SchemasToCheck).

+ 104 - 23
apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl

@@ -356,25 +356,36 @@ protobuf_invalid_payloads() ->
     ].
 
 protobuf_create_serde(SerdeName) ->
-    Source =
-        <<
-            "message Person {\n"
-            "     required string name = 1;\n"
-            "     required int32 id = 2;\n"
-            "     optional string email = 3;\n"
-            "  }\n"
-            "message UnionValue {\n"
-            "    oneof u {\n"
-            "        int32  a = 1;\n"
-            "        string b = 2;\n"
-            "    }\n"
-            "}"
-        >>,
+    protobuf_upsert_serde(SerdeName, <<"Person">>).
+
+protobuf_upsert_serde(SerdeName, MessageType) ->
+    Source = protobuf_source(MessageType),
     Schema = #{type => protobuf, source => Source},
     ok = emqx_schema_registry:add_schema(SerdeName, Schema),
     on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
     ok.
 
+protobuf_source(MessageType) ->
+    iolist_to_binary(
+        [
+            <<"message ">>,
+            MessageType,
+            <<" {\n">>,
+            <<
+                "     required string name = 1;\n"
+                "     required int32 id = 2;\n"
+                "     optional string email = 3;\n"
+                "  }\n"
+                "message UnionValue {\n"
+                "    oneof u {\n"
+                "        int32  a = 1;\n"
+                "        string b = 2;\n"
+                "    }\n"
+                "}"
+            >>
+        ]
+    ).
+
 %% Checks that the internal order in the registry/index matches expectation.
 assert_index_order(ExpectedOrder, Topic, Comment) ->
     ?assertEqual(
@@ -1041,6 +1052,7 @@ t_duplicated_schema_checks(_Config) ->
     Name1 = <<"foo">>,
     SerdeName = <<"myserde">>,
     Check = schema_check(json, SerdeName),
+    json_create_serde(SerdeName),
 
     Validation1 = validation(Name1, [Check, sql_check(), Check]),
     ?assertMatch({400, _}, insert(Validation1)),
@@ -1130,18 +1142,87 @@ t_multiple_validations(_Config) ->
 
     ok.
 
+%% Test that we validate schema registry serde existency when using the HTTP API.
 t_schema_check_non_existent_serde(_Config) ->
     SerdeName = <<"idontexist">>,
     Name1 = <<"foo">>,
+
     Check1 = schema_check(json, SerdeName),
     Validation1 = validation(Name1, [Check1]),
-    {201, _} = insert(Validation1),
+    ?assertMatch({400, _}, insert(Validation1)),
 
-    C = connect(<<"c1">>),
-    {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+    Check2 = schema_check(avro, SerdeName),
+    Validation2 = validation(Name1, [Check2]),
+    ?assertMatch({400, _}, insert(Validation2)),
 
-    ok = publish(C, <<"t/1">>, #{i => 10, s => <<"s">>}),
-    ?assertNotReceive({publish, _}),
+    MessageType = <<"idontexisteither">>,
+    Check3 = schema_check(protobuf, SerdeName, #{<<"message_type">> => MessageType}),
+    Validation3 = validation(Name1, [Check3]),
+    ?assertMatch({400, _}, insert(Validation3)),
+
+    protobuf_create_serde(SerdeName),
+    %% Still fails because reference message type doesn't exist.
+    ?assertMatch({400, _}, insert(Validation3)),
+
+    ok.
+
+%% Test that we validate schema registry serde existency when loading configs.
+t_schema_check_non_existent_serde_load_config(_Config) ->
+    Name1 = <<"1">>,
+    SerdeName1 = <<"serde1">>,
+    MessageType1 = <<"mt">>,
+    Check1A = schema_check(protobuf, SerdeName1, #{<<"message_type">> => MessageType1}),
+    Validation1A = validation(Name1, [Check1A]),
+    protobuf_upsert_serde(SerdeName1, MessageType1),
+    {201, _} = insert(Validation1A),
+    Name2 = <<"2">>,
+    SerdeName2 = <<"serde2">>,
+    Check2A = schema_check(json, SerdeName2),
+    Validation2A = validation(Name2, [Check2A]),
+    json_create_serde(SerdeName2),
+    {201, _} = insert(Validation2A),
+
+    %% Config to load
+    %% Will replace existing config
+    MissingMessageType = <<"missing_mt">>,
+    Check1B = schema_check(protobuf, SerdeName1, #{<<"message_type">> => MissingMessageType}),
+    Validation1B = validation(Name1, [Check1B]),
+
+    %% Will replace existing config
+    MissingSerdeName1 = <<"missing1">>,
+    Check2B = schema_check(json, MissingSerdeName1),
+    Validation2B = validation(Name2, [Check2B]),
+
+    %% New validation; should be appended
+    Name3 = <<"3">>,
+    MissingSerdeName2 = <<"missing2">>,
+    Check3 = schema_check(avro, MissingSerdeName2),
+    Validation3 = validation(Name3, [Check3]),
+
+    ConfRootBin = <<"schema_validation">>,
+    ConfigToLoad1 = #{
+        ConfRootBin => #{
+            <<"validations">> => [Validation1B, Validation2B, Validation3]
+        }
+    },
+    ConfigToLoadBin1 = iolist_to_binary(hocon_pp:do(ConfigToLoad1, #{})),
+    %% Merge
+    ResMerge = emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => merge}),
+    ?assertMatch({error, _}, ResMerge),
+    {error, ErrorMessage1} = ResMerge,
+    ?assertEqual(match, re:run(ErrorMessage1, <<"missing_schemas">>, [{capture, none}])),
+    ?assertEqual(match, re:run(ErrorMessage1, MissingSerdeName1, [{capture, none}])),
+    ?assertEqual(match, re:run(ErrorMessage1, MissingSerdeName2, [{capture, none}])),
+    ?assertEqual(match, re:run(ErrorMessage1, MissingMessageType, [{capture, none}])),
+
+    %% Replace
+    ResReplace = emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => replace}),
+    ?assertMatch({error, _}, ResReplace),
+    {error, ErrorMessage2} = ResReplace,
+    ?assertEqual(match, re:run(ErrorMessage2, <<"missing_schemas">>, [{capture, none}])),
+    ?assertEqual(match, re:run(ErrorMessage2, MissingSerdeName1, [{capture, none}])),
+    ?assertEqual(match, re:run(ErrorMessage2, MissingSerdeName2, [{capture, none}])),
+    ?assertEqual(match, re:run(ErrorMessage2, MissingMessageType, [{capture, none}])),
 
     ok.
 
@@ -1232,16 +1313,16 @@ t_schema_check_protobuf(_Config) ->
     ),
 
     %% Bad config: unknown message name
-    Check2 = schema_check(protobuf, SerdeName, #{<<"message_type">> => <<"idontexist">>}),
-    Validation2 = validation(Name1, [Check2]),
-    {200, _} = update(Validation2),
+    %% Schema updated to use another message type after validation was created
+    OtherMessageType = <<"NewPersonType">>,
+    protobuf_upsert_serde(SerdeName, OtherMessageType),
 
     lists:foreach(
         fun(Payload) ->
             ok = publish(C, <<"t/1">>, {raw, Payload}),
             ?assertNotReceive({publish, _})
         end,
-        protobuf_valid_payloads(SerdeName, MessageType)
+        protobuf_valid_payloads(SerdeName, OtherMessageType)
     ),
 
     ok.

+ 1 - 0
changes/ee/feat-13210.en.md

@@ -0,0 +1 @@
+Now, when inserting or updating a Schema Validation, EMQX will check if the referenced schemas and message types exist in Schema Registry.