|
|
@@ -1,7 +1,7 @@
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
%%--------------------------------------------------------------------
|
|
|
--module(emqx_ee_schema_registry_SUITE).
|
|
|
+-module(emqx_schema_registry_SUITE).
|
|
|
|
|
|
-compile(export_all).
|
|
|
-compile(nowarn_export_all).
|
|
|
@@ -10,11 +10,11 @@
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
--include("emqx_ee_schema_registry.hrl").
|
|
|
+-include("emqx_schema_registry.hrl").
|
|
|
|
|
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
|
|
|
|
|
--define(APPS, [emqx_conf, emqx_rule_engine, emqx_ee_schema_registry]).
|
|
|
+-define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% CT boilerplate
|
|
|
@@ -46,7 +46,7 @@ sparkplug_tests() ->
|
|
|
].
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
- emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
|
|
|
+ emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema),
|
|
|
emqx_mgmt_api_test_util:init_suite(?APPS),
|
|
|
Config.
|
|
|
|
|
|
@@ -156,7 +156,7 @@ schema_params(protobuf) ->
|
|
|
|
|
|
create_serde(SerdeType, SerdeName) ->
|
|
|
Schema = schema_params(SerdeType),
|
|
|
- ok = emqx_ee_schema_registry:add_schema(SerdeName, Schema),
|
|
|
+ ok = emqx_schema_registry:add_schema(SerdeName, Schema),
|
|
|
ok.
|
|
|
|
|
|
test_params_for(avro, encode_decode1) ->
|
|
|
@@ -313,9 +313,9 @@ test_params_for(Type, Name) ->
|
|
|
clear_schemas() ->
|
|
|
maps:foreach(
|
|
|
fun(Name, _Schema) ->
|
|
|
- ok = emqx_ee_schema_registry:delete_schema(Name)
|
|
|
+ ok = emqx_schema_registry:delete_schema(Name)
|
|
|
end,
|
|
|
- emqx_ee_schema_registry:list_schemas()
|
|
|
+ emqx_schema_registry:list_schemas()
|
|
|
).
|
|
|
|
|
|
receive_action_results() ->
|
|
|
@@ -367,7 +367,7 @@ cluster(Config) ->
|
|
|
%% need to restart schema registry app in the tests so
|
|
|
%% that it re-registers the config handler that is lost
|
|
|
%% when emqx_conf restarts during join.
|
|
|
- {env, [{emqx_machine, applications, [emqx_ee_schema_registry]}]},
|
|
|
+ {env, [{emqx_machine, applications, [emqx_schema_registry]}]},
|
|
|
{load_apps, [emqx_machine | ?APPS]},
|
|
|
{env_handler, fun
|
|
|
(emqx) ->
|
|
|
@@ -453,11 +453,11 @@ protobuf_unique_cache_hit_spec(_Res, _Trace) ->
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
t_unknown_calls(_Config) ->
|
|
|
- Ref = monitor(process, emqx_ee_schema_registry),
|
|
|
+ Ref = monitor(process, emqx_schema_registry),
|
|
|
%% for coverage
|
|
|
- emqx_ee_schema_registry ! unknown,
|
|
|
- gen_server:cast(emqx_ee_schema_registry, unknown),
|
|
|
- ?assertEqual({error, unknown_call}, gen_server:call(emqx_ee_schema_registry, unknown)),
|
|
|
+ emqx_schema_registry ! unknown,
|
|
|
+ gen_server:cast(emqx_schema_registry, unknown),
|
|
|
+ ?assertEqual({error, unknown_call}, gen_server:call(emqx_schema_registry, unknown)),
|
|
|
receive
|
|
|
{'DOWN', Ref, process, _, _} ->
|
|
|
ct:fail("registry shouldn't have died")
|
|
|
@@ -489,7 +489,7 @@ t_delete_serde(Config) ->
|
|
|
ok = create_serde(SerdeType, SerdeName),
|
|
|
{ok, {ok, _}} =
|
|
|
?wait_async_action(
|
|
|
- emqx_ee_schema_registry:delete_schema(SerdeName),
|
|
|
+ emqx_schema_registry:delete_schema(SerdeName),
|
|
|
#{?snk_kind := schema_registry_serdes_deleted},
|
|
|
1_000
|
|
|
),
|
|
|
@@ -522,7 +522,7 @@ t_encode(Config) ->
|
|
|
Published
|
|
|
),
|
|
|
#{payload := Encoded} = Published,
|
|
|
- {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
|
|
|
+ {ok, #{deserializer := Deserializer}} = emqx_schema_registry:get_serde(SerdeName),
|
|
|
?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])),
|
|
|
ok.
|
|
|
|
|
|
@@ -536,7 +536,7 @@ t_decode(Config) ->
|
|
|
extra_args := ExtraArgs
|
|
|
} = test_params_for(SerdeType, decode1),
|
|
|
{ok, _} = create_rule_http(#{sql => SQL}),
|
|
|
- {ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
|
|
|
+ {ok, #{serializer := Serializer}} = emqx_schema_registry:get_serde(SerdeName),
|
|
|
EncodedBin = apply(Serializer, [Payload | ExtraArgs]),
|
|
|
emqx:publish(emqx_message:make(<<"t">>, EncodedBin)),
|
|
|
Published = receive_published(?LINE),
|
|
|
@@ -559,7 +559,7 @@ t_protobuf_union_encode(Config) ->
|
|
|
extra_args := ExtraArgs
|
|
|
} = test_params_for(SerdeType, union1),
|
|
|
{ok, _} = create_rule_http(#{sql => SQL}),
|
|
|
- {ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
|
|
|
+ {ok, #{serializer := Serializer}} = emqx_schema_registry:get_serde(SerdeName),
|
|
|
|
|
|
EncodedBinA = apply(Serializer, [PayloadA | ExtraArgs]),
|
|
|
emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
|
|
|
@@ -594,7 +594,7 @@ t_protobuf_union_decode(Config) ->
|
|
|
extra_args := ExtraArgs
|
|
|
} = test_params_for(SerdeType, union2),
|
|
|
{ok, _} = create_rule_http(#{sql => SQL}),
|
|
|
- {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
|
|
|
+ {ok, #{deserializer := Deserializer}} = emqx_schema_registry:get_serde(SerdeName),
|
|
|
|
|
|
EncodedBinA = emqx_utils_json:encode(PayloadA),
|
|
|
emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
|
|
|
@@ -639,9 +639,9 @@ t_fail_rollback(Config) ->
|
|
|
#{}
|
|
|
)
|
|
|
),
|
|
|
- ?assertMatch({ok, #{name := <<"a">>}}, emqx_ee_schema_registry:get_serde(<<"a">>)),
|
|
|
+ ?assertMatch({ok, #{name := <<"a">>}}, emqx_schema_registry:get_serde(<<"a">>)),
|
|
|
%% no z serdes should be in the table
|
|
|
- ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)),
|
|
|
+ ?assertEqual({error, not_found}, emqx_schema_registry:get_serde(<<"z">>)),
|
|
|
ok.
|
|
|
|
|
|
t_cluster_serde_build(Config) ->
|
|
|
@@ -660,13 +660,13 @@ t_cluster_serde_build(Config) ->
|
|
|
wait_for_cluster_rpc(N2),
|
|
|
?assertEqual(
|
|
|
ok,
|
|
|
- erpc:call(N2, emqx_ee_schema_registry, add_schema, [SerdeName, Schema])
|
|
|
+ erpc:call(N2, emqx_schema_registry, add_schema, [SerdeName, Schema])
|
|
|
),
|
|
|
%% check that we can serialize/deserialize in all nodes
|
|
|
lists:foreach(
|
|
|
fun(N) ->
|
|
|
erpc:call(N, fun() ->
|
|
|
- Res0 = emqx_ee_schema_registry:get_serde(SerdeName),
|
|
|
+ Res0 = emqx_schema_registry:get_serde(SerdeName),
|
|
|
?assertMatch({ok, #{}}, Res0, #{node => N}),
|
|
|
{ok, #{serializer := Serializer, deserializer := Deserializer}} = Res0,
|
|
|
?assertEqual(
|
|
|
@@ -691,7 +691,7 @@ t_cluster_serde_build(Config) ->
|
|
|
),
|
|
|
?assertEqual(
|
|
|
ok,
|
|
|
- erpc:call(N1, emqx_ee_schema_registry, delete_schema, [SerdeName])
|
|
|
+ erpc:call(N1, emqx_schema_registry, delete_schema, [SerdeName])
|
|
|
),
|
|
|
{ok, _} = snabbkaffe:receive_events(SRef1),
|
|
|
lists:foreach(
|
|
|
@@ -699,7 +699,7 @@ t_cluster_serde_build(Config) ->
|
|
|
erpc:call(N, fun() ->
|
|
|
?assertMatch(
|
|
|
{error, not_found},
|
|
|
- emqx_ee_schema_registry:get_serde(SerdeName),
|
|
|
+ emqx_schema_registry:get_serde(SerdeName),
|
|
|
#{node => N}
|
|
|
),
|
|
|
ok
|
|
|
@@ -726,7 +726,15 @@ t_import_config(_Config) ->
|
|
|
#{
|
|
|
<<"description">> => <<"My Avro Schema">>,
|
|
|
<<"source">> =>
|
|
|
- <<"{\"type\":\"record\",\"fields\":[{\"type\":\"int\",\"name\":\"i\"},{\"type\":\"string\",\"name\":\"s\"}]}">>,
|
|
|
+ emqx_utils_json:encode(
|
|
|
+ #{
|
|
|
+ type => <<"record">>,
|
|
|
+ fields => [
|
|
|
+ #{type => <<"int">>, name => <<"i">>},
|
|
|
+ #{type => <<"string">>, name => <<"s">>}
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ ),
|
|
|
<<"type">> => <<"avro">>
|
|
|
}
|
|
|
}
|
|
|
@@ -740,15 +748,21 @@ t_import_config(_Config) ->
|
|
|
Path = [schema_registry, schemas, <<"my_avro_schema">>],
|
|
|
?assertEqual(
|
|
|
{ok, #{root_key => schema_registry, changed => []}},
|
|
|
- emqx_ee_schema_registry:import_config(RawConf)
|
|
|
+ emqx_schema_registry:import_config(RawConf)
|
|
|
),
|
|
|
?assertEqual(
|
|
|
{ok, #{root_key => schema_registry, changed => [Path]}},
|
|
|
- emqx_ee_schema_registry:import_config(RawConf1)
|
|
|
+ emqx_schema_registry:import_config(RawConf1)
|
|
|
).
|
|
|
|
|
|
sparkplug_example_data_base64() ->
|
|
|
- <<"CPHh67HrMBIqChxjb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xc2VjGPXh67HrMCACUKgDEikKHGNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxXzVzZWMY9eHrseswIAJQVBIqCh1jb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xMHNlYxj14eux6zAgAlAqEigKG2NvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3J1bhj14eux6zAgBVABEioKHWNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3Jlc2V0GPXh67HrMCAFUAAYWA==">>.
|
|
|
+ <<
|
|
|
+ "CPHh67HrMBIqChxjb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xc2VjGPXh67HrMCACUKgD"
|
|
|
+ "EikKHGNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxXzVzZWMY9eHrseswIAJQVBIqCh1jb3Vu"
|
|
|
+ "dGVyX2dyb3VwMS9jb3VudGVyMV8xMHNlYxj14eux6zAgAlAqEigKG2NvdW50ZXJfZ3Jv"
|
|
|
+ "dXAxL2NvdW50ZXIxX3J1bhj14eux6zAgBVABEioKHWNvdW50ZXJfZ3JvdXAxL2NvdW50"
|
|
|
+ "ZXIxX3Jlc2V0GPXh67HrMCAFUAAYWA=="
|
|
|
+ >>.
|
|
|
|
|
|
sparkplug_example_data() ->
|
|
|
#{
|