Przeglądaj źródła

feat(external schema registry): add config load support

Follow up to https://github.com/emqx/emqx/pull/13804
Thales Macedo Garitezi 1 rok temu
rodzic
commit
2e1cb3d355

+ 1 - 0
apps/emqx_schema_registry/include/emqx_schema_registry.hrl

@@ -6,6 +6,7 @@
 -define(EMQX_SCHEMA_REGISTRY_HRL, true).
 
 -define(CONF_KEY_ROOT, schema_registry).
+-define(CONF_KEY_ROOT_BIN, <<"schema_registry">>).
 -define(CONF_KEY_PATH, [?CONF_KEY_ROOT]).
 
 %% Note: this has the `_ee_' segment for backwards compatibility.

+ 93 - 35
apps/emqx_schema_registry/src/emqx_schema_registry_config.erl

@@ -3,6 +3,8 @@
 %%--------------------------------------------------------------------
 -module(emqx_schema_registry_config).
 
+-feature(maybe_expr, enable).
+
 -include("emqx_schema_registry.hrl").
 
 %% API
@@ -162,10 +164,60 @@ post_config_update(
     _Old,
     _AppEnvs
 ) ->
+    do_upsert_external_registry(Name, NewConfig),
+    ok;
+post_config_update([?CONF_KEY_ROOT], _Cmd, NewConf, OldConf, _AppEnvs) ->
+    Context0 = #{},
+    maybe
+        {ok, Context1} ?= handle_import_schemas(Context0, NewConf, OldConf),
+        {ok, _Context2} ?= handle_import_external_registries(Context1, NewConf, OldConf)
+    end;
+post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
+    {ok, NewConf}.
+
+%%------------------------------------------------------------------------------
+%% `emqx_config_backup' API
+%%------------------------------------------------------------------------------
+
+import_config(#{?CONF_KEY_ROOT_BIN := RawConf0}) ->
+    Result = emqx_conf:update(
+        [?CONF_KEY_ROOT],
+        RawConf0,
+        #{override_to => cluster, rawconf_with_defaults => true}
+    ),
+    case Result of
+        {error, Reason} ->
+            {error, #{root_key => ?CONF_KEY_ROOT, reason => Reason}};
+        {ok, Res} ->
+            ChangedPaths = emqx_utils_maps:deep_get(
+                [post_config_update, ?MODULE, changed_paths],
+                Res,
+                []
+            ),
+            {ok, #{root_key => ?CONF_KEY_ROOT, changed => ChangedPaths}}
+    end;
+import_config(_RawConf) ->
+    {ok, #{root_key => ?CONF_KEY_ROOT, changed => []}}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+add_external_registry(Name, Config) ->
+    ok = emqx_schema_registry_external:add(Name, Config),
+    ok.
+
+remove_external_registry(Name) ->
+    ok = emqx_schema_registry_external:remove(Name),
+    ok.
+
+%% receives parsed config with atom keys
+do_upsert_external_registry(Name, NewConfig) ->
     remove_external_registry(Name),
     add_external_registry(Name, NewConfig),
-    ok;
-post_config_update(?CONF_KEY_PATH, _Cmd, NewConf = #{schemas := NewSchemas}, OldConf, _AppEnvs) ->
+    ok.
+
+handle_import_schemas(Context0, #{schemas := NewSchemas}, OldConf) ->
     OldSchemas = maps:get(schemas, OldConf, #{}),
     #{
         added := Added,
@@ -187,40 +239,46 @@ post_config_update(?CONF_KEY_PATH, _Cmd, NewConf = #{schemas := NewSchemas}, Old
     ]),
     case emqx_schema_registry:build_serdes(SchemasToBuild) of
         ok ->
-            {ok, NewConf};
+            ChangedPaths = [
+                [?CONF_KEY_ROOT, schemas, N]
+             || {N, _} <- SchemasToBuild
+            ],
+            Context = maps:update_with(
+                changed_paths,
+                fun(Ps) -> ChangedPaths ++ Ps end,
+                ChangedPaths,
+                Context0
+            ),
+            {ok, Context};
         {error, Reason, SerdesToRollback} ->
             lists:foreach(fun emqx_schema_registry:ensure_serde_absent/1, SerdesToRollback),
             {error, Reason}
-    end;
-post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
-    {ok, NewConf}.
-
-%%------------------------------------------------------------------------------
-%% `emqx_config_backup' API
-%%------------------------------------------------------------------------------
-
-import_config(#{<<"schema_registry">> := #{<<"schemas">> := Schemas} = SchemaRegConf}) ->
-    OldSchemas = emqx:get_raw_config([?CONF_KEY_ROOT, schemas], #{}),
-    SchemaRegConf1 = SchemaRegConf#{<<"schemas">> => maps:merge(OldSchemas, Schemas)},
-    case emqx_conf:update(?CONF_KEY_PATH, SchemaRegConf1, #{override_to => cluster}) of
-        {ok, #{raw_config := #{<<"schemas">> := NewRawSchemas}}} ->
-            Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawSchemas, OldSchemas)),
-            ChangedPaths = [[?CONF_KEY_ROOT, schemas, Name] || Name <- maps:keys(Changed)],
-            {ok, #{root_key => ?CONF_KEY_ROOT, changed => ChangedPaths}};
-        Error ->
-            {error, #{root_key => ?CONF_KEY_ROOT, reason => Error}}
-    end;
-import_config(_RawConf) ->
-    {ok, #{root_key => ?CONF_KEY_ROOT, changed => []}}.
-
-%%------------------------------------------------------------------------------
-%% Internal fns
-%%------------------------------------------------------------------------------
-
-add_external_registry(Name, Config) ->
-    ok = emqx_schema_registry_external:add(Name, Config),
-    ok.
+    end.
 
-remove_external_registry(Name) ->
-    ok = emqx_schema_registry_external:remove(Name),
-    ok.
+handle_import_external_registries(Context0, NewConf, OldConf) ->
+    New = maps:get(external, NewConf, #{}),
+    Old = maps:get(external, OldConf, #{}),
+    #{
+        added := Added,
+        changed := Changed0,
+        removed := Removed
+    } = emqx_utils_maps:diff_maps(New, Old),
+    Changed = maps:map(fun(_N, {_Old, New0}) -> New0 end, Changed0),
+    RemovedNames = maps:keys(Removed),
+    RegistriesToUpsert = maps:to_list(maps:merge(Changed, Added)),
+    lists:foreach(fun remove_external_registry/1, RemovedNames),
+    lists:foreach(
+        fun({Name, Cfg}) -> do_upsert_external_registry(Name, Cfg) end,
+        RegistriesToUpsert
+    ),
+    ChangedPaths = [
+        [?CONF_KEY_ROOT, external, N]
+     || {N, _} <- RegistriesToUpsert
+    ],
+    Context = maps:update_with(
+        changed_paths,
+        fun(Ps) -> ChangedPaths ++ Ps end,
+        ChangedPaths,
+        Context0
+    ),
+    {ok, Context}.

+ 107 - 8
apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl

@@ -23,14 +23,21 @@ all() ->
         {group, avro},
         {group, protobuf},
         {group, json}
-    ] ++ sparkplug_tests().
+    ] ++ sparkplug_tests() ++ only_once_tcs().
 
 groups() ->
-    AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(),
+    OnlyOnceTCs = only_once_tcs(),
+    AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- (sparkplug_tests() ++ OnlyOnceTCs),
     ProtobufOnlyTCs = protobuf_only_tcs(),
     TCs = AllTCsExceptSP -- ProtobufOnlyTCs,
     [{avro, TCs}, {json, TCs}, {protobuf, AllTCsExceptSP}].
 
+only_once_tcs() ->
+    [
+        t_import_config,
+        t_external_registry_load_config
+    ].
+
 protobuf_only_tcs() ->
     [
         t_protobuf_union_encode,
@@ -85,6 +92,7 @@ end_per_testcase(_TestCase, _Config) ->
     ok = snabbkaffe:stop(),
     emqx_common_test_helpers:call_janitor(),
     clear_schemas(),
+    clear_external_registries(),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -345,6 +353,14 @@ clear_schemas() ->
         emqx_schema_registry:list_schemas()
     ).
 
+clear_external_registries() ->
+    maps:foreach(
+        fun(Name, _Schema) ->
+            ok = emqx_schema_registry_config:delete_external_registry(Name)
+        end,
+        emqx_schema_registry_config:list_external_registries()
+    ).
+
 receive_action_results() ->
     receive
         {action, #{data := _} = Res} ->
@@ -753,10 +769,93 @@ t_cluster_serde_build(Config) ->
     ),
     ok.
 
-%% Verifies that importing in both `merge' and `replace' modes work with external registries
-%% t_external_registry_import_config(_Config) ->
-%%     ct:fail(todo),
-%%     ok.
+%% Verifies that importing in both `merge' and `replace' modes work with external
+%% registries, when importing configurations from the CLI interface.
+t_external_registry_load_config(_Config) ->
+    %% Existing config
+    Name0 = <<"preexisting">>,
+    Config0 = emqx_schema_registry_http_api_SUITE:confluent_schema_registry_with_basic_auth(),
+    {201, _} = emqx_schema_registry_http_api_SUITE:create_external_registry(Config0#{
+        <<"name">> => Name0
+    }),
+    [_] = emqx_schema_registry_http_api_SUITE:find_external_registry_worker(Name0),
+
+    %% Config to load
+    %% Will update existing config
+    Name1 = Name0,
+    URL1 = <<"http://new_url:8081">>,
+    Config1 = emqx_schema_registry_http_api_SUITE:confluent_schema_registry_with_basic_auth(#{
+        <<"url">> => URL1
+    }),
+    %% New config
+    Name2 = <<"new">>,
+    URL2 = <<"http://yet_another_url:8081">>,
+    Config2 = emqx_schema_registry_http_api_SUITE:confluent_schema_registry_with_basic_auth(#{
+        <<"url">> => URL2
+    }),
+    ConfigToLoad1 = #{
+        <<"schema_registry">> => #{
+            <<"external">> => #{
+                Name1 => Config1,
+                Name2 => Config2
+            }
+        }
+    },
+    ConfigToLoad1Bin = iolist_to_binary(hocon_pp:do(ConfigToLoad1, #{})),
+    ?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoad1Bin, #{mode => merge})),
+
+    Path = [schema_registry, external],
+    PathBin = [emqx_utils_conv:bin(PS) || PS <- Path],
+    Name1Atom = binary_to_atom(Name1),
+    Name2Atom = binary_to_atom(Name2),
+    ?assertMatch(
+        #{
+            Name1 := #{<<"url">> := URL1},
+            Name2 := #{<<"url">> := URL2}
+        },
+        emqx_config:get_raw(PathBin)
+    ),
+    ?assertMatch(
+        #{
+            Name1Atom := #{url := URL1},
+            Name2Atom := #{url := URL2}
+        },
+        emqx_config:get(Path)
+    ),
+    %% Correctly starts new processes
+    ?assertMatch([_], emqx_schema_registry_http_api_SUITE:find_external_registry_worker(Name1)),
+    ?assertMatch([_], emqx_schema_registry_http_api_SUITE:find_external_registry_worker(Name2)),
+
+    %% New config; will replace everything
+    Name3 = Name0,
+    URL3 = <<"http://final_url:8081">>,
+    Config3 = emqx_schema_registry_http_api_SUITE:confluent_schema_registry_with_basic_auth(#{
+        <<"url">> => URL3
+    }),
+    ConfigToLoad2 = #{
+        <<"schema_registry">> => #{
+            <<"external">> => #{
+                Name3 => Config3
+            }
+        }
+    },
+    ConfigToLoad2Bin = iolist_to_binary(hocon_pp:do(ConfigToLoad2, #{})),
+    ?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoad2Bin, #{mode => replace})),
+
+    Name3Atom = binary_to_atom(Name3),
+    ?assertMatch(
+        #{Name3 := #{<<"url">> := URL3}},
+        emqx_config:get_raw(PathBin)
+    ),
+    ?assertMatch(
+        #{Name3Atom := #{url := URL3}},
+        emqx_config:get(Path)
+    ),
+    %% Correctly stops old processes
+    ?assertMatch([_], emqx_schema_registry_http_api_SUITE:find_external_registry_worker(Name3)),
+    ?assertMatch([], emqx_schema_registry_http_api_SUITE:find_external_registry_worker(Name2)),
+
+    ok.
 
 t_import_config(_Config) ->
     RawConf = #{
@@ -789,9 +888,9 @@ t_import_config(_Config) ->
         RawConf,
         <<"Updated description">>
     ),
-    Path = [schema_registry, schemas, <<"my_avro_schema">>],
+    Path = [schema_registry, schemas, my_avro_schema],
     ?assertEqual(
-        {ok, #{root_key => schema_registry, changed => []}},
+        {ok, #{root_key => schema_registry, changed => [Path]}},
         emqx_schema_registry_config:import_config(RawConf)
     ),
     ?assertEqual(