Просмотр исходного кода

feat(schema registry): add check for inner types

Currently, only `protobuf` has any.
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
03b226248a

+ 1 - 1
apps/emqx_schema_registry/include/emqx_schema_registry.hrl

@@ -26,7 +26,7 @@
 -type encoded_data() :: iodata().
 -type encoded_data() :: iodata().
 -type decoded_data() :: map().
 -type decoded_data() :: map().
 
 
--type serde_type() :: avro | protobuf | json.
+-type serde_type() :: emqx_schema_registry_serde:serde_type().
 -type serde_opts() :: map().
 -type serde_opts() :: map().
 
 
 -record(serde, {
 -record(serde, {

+ 11 - 0
apps/emqx_schema_registry/src/emqx_schema_registry.erl

@@ -16,6 +16,8 @@
     start_link/0,
     start_link/0,
     add_schema/2,
     add_schema/2,
     get_schema/1,
     get_schema/1,
+    is_existing_type/1,
+    is_existing_type/2,
     delete_schema/1,
     delete_schema/1,
     list_schemas/0
     list_schemas/0
 ]).
 ]).
@@ -52,6 +54,7 @@
 %% API
 %% API
 %%-------------------------------------------------------------------------------------------------
 %%-------------------------------------------------------------------------------------------------
 
 
+-spec start_link() -> gen_server:start_ret().
 start_link() ->
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
@@ -64,6 +67,14 @@ get_serde(SchemaName) ->
             {ok, Serde}
             {ok, Serde}
     end.
     end.
 
 
+-spec is_existing_type(schema_name()) -> boolean().
+is_existing_type(SchemaName) ->
+    is_existing_type(SchemaName, []).
+
+-spec is_existing_type(schema_name(), [binary()]) -> boolean().
+is_existing_type(SchemaName, Path) ->
+    emqx_schema_registry_serde:is_existing_type(SchemaName, Path).
+
 -spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
 -spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
 get_schema(SchemaName) ->
 get_schema(SchemaName) ->
     case
     case

+ 45 - 0
apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl

@@ -3,6 +3,8 @@
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 -module(emqx_schema_registry_serde).
 -module(emqx_schema_registry_serde).
 
 
+-feature(maybe_expr, enable).
+
 -behaviour(emqx_rule_funcs).
 -behaviour(emqx_rule_funcs).
 
 
 -include("emqx_schema_registry.hrl").
 -include("emqx_schema_registry.hrl").
@@ -14,6 +16,8 @@
     make_serde/3,
     make_serde/3,
     handle_rule_function/2,
     handle_rule_function/2,
     schema_check/3,
     schema_check/3,
+    is_existing_type/1,
+    is_existing_type/2,
     destroy/1
     destroy/1
 ]).
 ]).
 
 
@@ -27,6 +31,10 @@
     eval_encode/2
     eval_encode/2
 ]).
 ]).
 
 
+%%------------------------------------------------------------------------------
+%% Type definitions
+%%------------------------------------------------------------------------------
+
 -define(BOOL(SerdeName, EXPR),
 -define(BOOL(SerdeName, EXPR),
     try
     try
         _ = EXPR,
         _ = EXPR,
@@ -38,10 +46,28 @@
     end
     end
 ).
 ).
 
 
+-type eval_context() :: term().
+
+-export_type([serde_type/0]).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% API
 %% API
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
+-spec is_existing_type(schema_name()) -> boolean().
+is_existing_type(SchemaName) ->
+    is_existing_type(SchemaName, []).
+
+-spec is_existing_type(schema_name(), [binary()]) -> boolean().
+is_existing_type(SchemaName, Path) ->
+    maybe
+        {ok, #serde{type = SerdeType, eval_context = EvalContext}} ?=
+            emqx_schema_registry:get_serde(SchemaName),
+        has_inner_type(SerdeType, EvalContext, Path)
+    else
+        _ -> false
+    end.
+
 -spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
 -spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
 handle_rule_function(sparkplug_decode, [Data]) ->
 handle_rule_function(sparkplug_decode, [Data]) ->
     handle_rule_function(
     handle_rule_function(
@@ -338,3 +364,22 @@ unload_code(SerdeMod) ->
     _ = code:purge(SerdeMod),
     _ = code:purge(SerdeMod),
     _ = code:delete(SerdeMod),
     _ = code:delete(SerdeMod),
     ok.
     ok.
+
+-spec has_inner_type(serde_type(), eval_context(), [binary()]) ->
+    boolean().
+has_inner_type(protobuf, _SerdeMod, [_, _ | _]) ->
+    %% Protobuf only has one level of message types.
+    false;
+has_inner_type(protobuf, SerdeMod, [MessageTypeBin]) ->
+    try apply(SerdeMod, get_msg_names, []) of
+        Names ->
+            lists:member(MessageTypeBin, [atom_to_binary(N, utf8) || N <- Names])
+    catch
+        _:_ ->
+            false
+    end;
+has_inner_type(_SerdeType, _EvalContext, []) ->
+    %% This function is only called if we already found a serde, so the root does exist.
+    true;
+has_inner_type(_SerdeType, _EvalContext, _Path) ->
+    false.

+ 37 - 6
apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl

@@ -14,7 +14,6 @@
 
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 
--define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]).
 -define(INVALID_JSON, #{
 -define(INVALID_JSON, #{
     reason := #{expected := "emqx_schema:json_binary()"},
     reason := #{expected := "emqx_schema:json_binary()"},
     kind := validation_error
     kind := validation_error
@@ -28,12 +27,20 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
     emqx_common_test_helpers:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
-    emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema),
-    emqx_mgmt_api_test_util:init_suite(?APPS),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_conf,
+            emqx_schema_registry,
+            emqx_rule_engine
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{apps, Apps} | Config].
 
 
-end_per_suite(_Config) ->
-    emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)),
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
     ok.
     ok.
 init_per_testcase(_TestCase, Config) ->
 init_per_testcase(_TestCase, Config) ->
     Config.
     Config.
@@ -240,3 +247,27 @@ t_json_validation(_Config) ->
     ?assertNot(F(schema_check, <<"{\"bar\": 2}">>)),
     ?assertNot(F(schema_check, <<"{\"bar\": 2}">>)),
     ?assertNot(F(schema_check, <<"{\"foo\": \"notinteger\", \"bar\": 2}">>)),
     ?assertNot(F(schema_check, <<"{\"foo\": \"notinteger\", \"bar\": 2}">>)),
     ok.
     ok.
+
+t_is_existing_type(_Config) ->
+    JsonName = <<"myjson">>,
+    ?assertNot(emqx_schema_registry:is_existing_type(JsonName)),
+    ok = emqx_schema_registry:add_schema(JsonName, schema_params(json)),
+    AvroName = <<"myavro">>,
+    ?assertNot(emqx_schema_registry:is_existing_type(AvroName)),
+    ok = emqx_schema_registry:add_schema(AvroName, schema_params(avro)),
+    ProtobufName = <<"myprotobuf">>,
+    MessageType = <<"Person">>,
+    ?assertNot(emqx_schema_registry:is_existing_type(ProtobufName)),
+    ok = emqx_schema_registry:add_schema(ProtobufName, schema_params(protobuf)),
+    %% JSON Schema: no inner names
+    ?assert(emqx_schema_registry:is_existing_type(JsonName)),
+    ?assertNot(emqx_schema_registry:is_existing_type(JsonName, [JsonName])),
+    %% Avro: no inner names
+    ?assert(emqx_schema_registry:is_existing_type(AvroName)),
+    ?assertNot(emqx_schema_registry:is_existing_type(AvroName, [AvroName])),
+    %% Protobuf: one level of message types
+    ?assert(emqx_schema_registry:is_existing_type(ProtobufName)),
+    ?assertNot(emqx_schema_registry:is_existing_type(ProtobufName, [ProtobufName])),
+    ?assert(emqx_schema_registry:is_existing_type(ProtobufName, [MessageType])),
+    ?assertNot(emqx_schema_registry:is_existing_type(ProtobufName, [MessageType, MessageType])),
+    ok.