Parcourir la source

feat: implement configuration and user data export/import CLI

Closes: EMQX-9203
Serge Tupchii il y a 2 ans
Parent
commit
e4d09d4ad4
32 fichiers modifiés avec 1990 ajouts et 126 suppressions
  1. 24 0
      apps/emqx/src/bhvrs/emqx_config_backup.erl
  2. 19 0
      apps/emqx/src/bhvrs/emqx_db_backup.erl
  3. 8 0
      apps/emqx/src/emqx_banned.erl
  4. 36 0
      apps/emqx_authn/src/emqx_authn.erl
  5. 12 0
      apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl
  6. 10 2
      apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl
  7. 62 4
      apps/emqx_authz/src/emqx_authz.erl
  8. 9 0
      apps/emqx_authz/src/emqx_authz_mnesia.erl
  9. 33 5
      apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl
  10. 89 7
      apps/emqx_bridge/src/emqx_bridge.erl
  11. 10 0
      apps/emqx_dashboard/src/emqx_dashboard_admin.erl
  12. 35 8
      apps/emqx_exhook/src/emqx_exhook_mgr.erl
  13. 31 8
      apps/emqx_gateway/src/emqx_gateway_conf.erl
  14. 4 15
      apps/emqx_management/src/emqx_mgmt_api_api_keys.erl
  15. 16 28
      apps/emqx_management/src/emqx_mgmt_api_listeners.erl
  16. 23 1
      apps/emqx_management/src/emqx_mgmt_auth.erl
  17. 34 1
      apps/emqx_management/src/emqx_mgmt_cli.erl
  18. 690 0
      apps/emqx_management/src/emqx_mgmt_data_backup.erl
  19. 96 0
      apps/emqx_management/src/emqx_mgmt_listeners_conf.erl
  20. 519 0
      apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl
  21. BIN
      apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-test-bootstrap-ce.tar.gz
  22. 12 12
      apps/emqx_modules/src/emqx_delayed.erl
  23. 11 7
      apps/emqx_modules/src/emqx_rewrite.erl
  24. 1 1
      apps/emqx_psk/src/emqx_psk.app.src
  25. 34 2
      apps/emqx_psk/src/emqx_psk.erl
  26. 10 10
      apps/emqx_retainer/src/emqx_retainer.erl
  27. 35 11
      apps/emqx_rule_engine/src/emqx_rule_engine.erl
  28. 38 1
      apps/emqx_utils/src/emqx_utils.erl
  29. 4 0
      changes/ce/feat-10676.en.md
  30. 50 3
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl
  31. 4 0
      lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl
  32. 31 0
      lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl

+ 24 - 0
apps/emqx/src/bhvrs/emqx_config_backup.erl

@@ -0,0 +1,24 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_config_backup).
+
+-callback import_config(RawConf :: map()) ->
+    {ok, #{
+        root_key => emqx_utils_maps:config_key(),
+        changed => [emqx_utils_maps:config_path()]
+    }}
+    | {error, #{root_key => emqx_utils_maps:config_key(), reason => term()}}.

+ 19 - 0
apps/emqx/src/bhvrs/emqx_db_backup.erl

@@ -0,0 +1,19 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_db_backup).
+
+-callback backup_tables() -> [mria:table()].

+ 8 - 0
apps/emqx/src/emqx_banned.erl

@@ -17,6 +17,7 @@
 -module(emqx_banned).
 
 -behaviour(gen_server).
+-behaviour(emqx_db_backup).
 
 -include("emqx.hrl").
 -include("logger.hrl").
@@ -50,6 +51,8 @@
     code_change/3
 ]).
 
+-export([backup_tables/0]).
+
 %% Internal exports (RPC)
 -export([
     expire_banned_items/1
@@ -82,6 +85,11 @@ mnesia(boot) ->
         {storage_properties, [{ets, [{read_concurrency, true}]}]}
     ]).
 
+%%--------------------------------------------------------------------
+%% Data backup
+%%--------------------------------------------------------------------
+backup_tables() -> [?BANNED_TAB].
+
 %% @doc Start the banned server.
 -spec start_link() -> startlink_ret().
 start_link() ->

+ 36 - 0
apps/emqx_authn/src/emqx_authn.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_authn).
 
+-behaviour(emqx_config_backup).
+
 -export([
     providers/0,
     check_config/1,
@@ -24,6 +26,11 @@
     get_enabled_authns/0
 ]).
 
+%% Data backup
+-export([
+    import_config/1
+]).
+
 -include("emqx_authn.hrl").
 
 providers() ->
@@ -126,3 +133,32 @@ get_enabled_authns() ->
 
 tally_authenticators(#{id := AuthenticatorName}, Acc) ->
     maps:update_with(AuthenticatorName, fun(N) -> N + 1 end, 1, Acc).
+
+%%------------------------------------------------------------------------------
+%% Data backup
+%%------------------------------------------------------------------------------
+
+-define(IMPORT_OPTS, #{override_to => cluster}).
+
+import_config(RawConf) ->
+    AuthnList = authn_list(maps:get(?CONF_NS_BINARY, RawConf, [])),
+    OldAuthnList = emqx:get_raw_config([?CONF_NS_BINARY], []),
+    MergedAuthnList = emqx_utils:merge_lists(
+        OldAuthnList, AuthnList, fun emqx_authentication:authenticator_id/1
+    ),
+    case emqx_conf:update([?CONF_NS_ATOM], MergedAuthnList, ?IMPORT_OPTS) of
+        {ok, #{raw_config := NewRawConf}} ->
+            {ok, #{root_key => ?CONF_NS_ATOM, changed => changed_paths(OldAuthnList, NewRawConf)}};
+        Error ->
+            {error, #{root_key => ?CONF_NS_ATOM, reason => Error}}
+    end.
+
+changed_paths(OldAuthnList, NewAuthnList) ->
+    KeyFun = fun emqx_authentication:authenticator_id/1,
+    Changed = maps:get(changed, emqx_utils:diff_lists(NewAuthnList, OldAuthnList, KeyFun)),
+    [[?CONF_NS_BINARY, emqx_authentication:authenticator_id(OldAuthn)] || {OldAuthn, _} <- Changed].
+
+authn_list(Authn) when is_list(Authn) ->
+    Authn;
+authn_list(Authn) when is_map(Authn) ->
+    [Authn].

+ 12 - 0
apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl

@@ -22,6 +22,7 @@
 
 -behaviour(hocon_schema).
 -behaviour(emqx_authentication).
+-behaviour(emqx_db_backup).
 
 -export([
     namespace/0,
@@ -54,6 +55,8 @@
     group_match_spec/1
 ]).
 
+-export([backup_tables/0]).
+
 %% Internal exports (RPC)
 -export([
     do_destroy/1,
@@ -101,6 +104,12 @@ mnesia(boot) ->
         {storage_properties, [{ets, [{read_concurrency, true}]}]}
     ]).
 
+%%------------------------------------------------------------------------------
+%% Data backup
+%%------------------------------------------------------------------------------
+
+backup_tables() -> [?TAB].
+
 %%------------------------------------------------------------------------------
 %% Hocon Schema
 %%------------------------------------------------------------------------------
@@ -357,6 +366,9 @@ check_client_final_message(Bin, #{is_superuser := IsSuperuser} = Cache, #{algori
 
 add_user(UserGroup, UserID, Password, IsSuperuser, State) ->
     {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(Password, State),
+    write_user(UserGroup, UserID, StoredKey, ServerKey, Salt, IsSuperuser).
+
+write_user(UserGroup, UserID, StoredKey, ServerKey, Salt, IsSuperuser) ->
     UserInfo = #user_info{
         user_id = {UserGroup, UserID},
         stored_key = StoredKey,

+ 10 - 2
apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl

@@ -23,6 +23,7 @@
 
 -behaviour(hocon_schema).
 -behaviour(emqx_authentication).
+-behaviour(emqx_db_backup).
 
 -export([
     namespace/0,
@@ -66,6 +67,10 @@
     import_csv/3
 ]).
 
+-export([mnesia/1]).
+
+-export([backup_tables/0]).
+
 -type user_group() :: binary().
 -type user_id() :: binary().
 
@@ -76,8 +81,6 @@
     is_superuser :: boolean()
 }).
 
--export([mnesia/1]).
-
 -boot_mnesia({mnesia, [boot]}).
 
 -define(TAB, ?MODULE).
@@ -103,6 +106,11 @@ mnesia(boot) ->
         {storage_properties, [{ets, [{read_concurrency, true}]}]}
     ]).
 
+%%------------------------------------------------------------------------------
+%% Data backup
+%%------------------------------------------------------------------------------
+backup_tables() -> [?TAB].
+
 %%------------------------------------------------------------------------------
 %% Hocon Schema
 %%------------------------------------------------------------------------------

+ 62 - 4
apps/emqx_authz/src/emqx_authz.erl

@@ -15,7 +15,9 @@
 %%--------------------------------------------------------------------
 
 -module(emqx_authz).
+
 -behaviour(emqx_config_handler).
+-behaviour(emqx_config_backup).
 
 -include("emqx_authz.hrl").
 -include_lib("emqx/include/logger.hrl").
@@ -44,6 +46,13 @@
 
 -export([acl_conf_file/0]).
 
+%% Data backup
+-export([
+    import_config/1,
+    maybe_read_acl_file/1,
+    maybe_write_acl_file/1
+]).
+
 -type source() :: map().
 
 -type match_result() :: {matched, allow} | {matched, deny} | nomatch.
@@ -326,9 +335,9 @@ init_metrics(Source) ->
             )
     end.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% AuthZ callbacks
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 %% @doc Check AuthZ
 -spec authorize(
@@ -451,9 +460,58 @@ do_authorize(
 get_enabled_authzs() ->
     lists:usort([Type || #{type := Type, enable := true} <- lookup()]).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+%% Data backup
+%%------------------------------------------------------------------------------
+
+import_config(#{?CONF_NS_BINARY := AuthzConf}) ->
+    Sources = maps:get(<<"sources">>, AuthzConf, []),
+    OldSources = emqx:get_raw_config(?CONF_KEY_PATH, []),
+    MergedSources = emqx_utils:merge_lists(OldSources, Sources, fun type/1),
+    MergedAuthzConf = AuthzConf#{<<"sources">> => MergedSources},
+    case emqx_conf:update([?CONF_NS_ATOM], MergedAuthzConf, #{override_to => cluster}) of
+        {ok, #{raw_config := #{<<"sources">> := NewSources}}} ->
+            {ok, #{
+                root_key => ?CONF_NS_ATOM,
+                changed => changed_paths(OldSources, NewSources)
+            }};
+        Error ->
+            {error, #{root_key => ?CONF_NS_ATOM, reason => Error}}
+    end;
+import_config(_RawConf) ->
+    {ok, #{root_key => ?CONF_NS_ATOM, changed => []}}.
+
+changed_paths(OldSources, NewSources) ->
+    Changed = maps:get(changed, emqx_utils:diff_lists(NewSources, OldSources, fun type/1)),
+    [?CONF_KEY_PATH ++ [type(OldSource)] || {OldSource, _} <- Changed].
+
+maybe_read_acl_file(RawConf) ->
+    maybe_convert_acl_file(RawConf, fun read_acl_file/1).
+
+maybe_write_acl_file(RawConf) ->
+    maybe_convert_acl_file(RawConf, fun write_acl_file/1).
+
+maybe_convert_acl_file(
+    #{?CONF_NS_BINARY := #{<<"sources">> := Sources} = AuthRawConf} = RawConf, Fun
+) ->
+    Sources1 = lists:map(
+        fun
+            (#{<<"type">> := <<"file">>} = FileSource) -> Fun(FileSource);
+            (Source) -> Source
+        end,
+        Sources
+    ),
+    RawConf#{?CONF_NS_BINARY => AuthRawConf#{<<"sources">> => Sources1}};
+maybe_convert_acl_file(RawConf, _Fun) ->
+    RawConf.
+
+read_acl_file(#{<<"path">> := Path} = Source) ->
+    {ok, Rules} = emqx_authz_file:read_file(Path),
+    maps:remove(<<"path">>, Source#{<<"rules">> => Rules}).
+
+%%------------------------------------------------------------------------------
 %% Internal function
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 client_info_source() ->
     emqx_authz_client_info:create(

+ 9 - 0
apps/emqx_authz/src/emqx_authz_mnesia.erl

@@ -42,6 +42,7 @@
 }).
 
 -behaviour(emqx_authz).
+-behaviour(emqx_db_backup).
 
 %% AuthZ Callbacks
 -export([
@@ -65,6 +66,8 @@
     record_count/0
 ]).
 
+-export([backup_tables/0]).
+
 -ifdef(TEST).
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -119,6 +122,12 @@ authorize(
             end,
     do_authorize(Client, PubSub, Topic, Rules).
 
+%%--------------------------------------------------------------------
+%% Data backup
+%%--------------------------------------------------------------------
+
+backup_tables() -> [?ACL_TABLE].
+
 %%--------------------------------------------------------------------
 %% Management API
 %%--------------------------------------------------------------------

+ 33 - 5
apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_auto_subscribe).
 
+-behaviour(emqx_config_backup).
+
 -include_lib("emqx/include/emqx_hooks.hrl").
 
 -behaviour(emqx_config_handler).
@@ -24,7 +26,6 @@
 
 -define(MAX_AUTO_SUBSCRIBE, 20).
 
-%
 -export([load/0, unload/0]).
 
 -export([
@@ -40,6 +41,11 @@
 %% exported for `emqx_telemetry'
 -export([get_basic_usage_info/0]).
 
+%% Data backup
+-export([
+    import_config/1
+]).
+
 load() ->
     ok = emqx_conf:add_handler([auto_subscribe, topics], ?MODULE),
     update_hook().
@@ -73,8 +79,9 @@ post_config_update(_KeyPath, _Req, NewTopics, _OldConf, _AppEnvs) ->
     Config = emqx_conf:get([auto_subscribe], #{}),
     update_hook(Config#{topics => NewTopics}).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% hook
+%%------------------------------------------------------------------------------
 
 on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) ->
     case erlang:apply(TopicHandler, handle, [ClientInfo, ConnInfo, Options]) of
@@ -87,17 +94,38 @@ on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) ->
 on_client_connected(_, _, _) ->
     ok.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Telemetry
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 -spec get_basic_usage_info() -> #{auto_subscribe_count => non_neg_integer()}.
 get_basic_usage_info() ->
     AutoSubscribe = emqx_conf:get([auto_subscribe, topics], []),
     #{auto_subscribe_count => length(AutoSubscribe)}.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+%% Data backup
+%%------------------------------------------------------------------------------
+
+import_config(#{<<"auto_subscribe">> := #{<<"topics">> := Topics}}) ->
+    ConfPath = [auto_subscribe, topics],
+    OldTopics = emqx:get_raw_config(ConfPath, []),
+    KeyFun = fun(#{<<"topic">> := T}) -> T end,
+    MergedTopics = emqx_utils:merge_lists(OldTopics, Topics, KeyFun),
+    case emqx_conf:update(ConfPath, MergedTopics, #{override_to => cluster}) of
+        {ok, #{raw_config := NewTopics}} ->
+            Changed = maps:get(changed, emqx_utils:diff_lists(NewTopics, OldTopics, KeyFun)),
+            Changed1 = [ConfPath ++ [T] || {#{<<"topic">> := T}, _} <- Changed],
+            {ok, #{root_key => auto_subscribe, changed => Changed1}};
+        Error ->
+            {error, #{root_key => auto_subscribe, reason => Error}}
+    end;
+import_config(_RawConf) ->
+    {ok, #{root_key => auto_subscribe, changed => []}}.
+
+%%------------------------------------------------------------------------------
 %% internal
+%%------------------------------------------------------------------------------
 
 format(Rules) when is_list(Rules) ->
     [format(Rule) || Rule <- Rules];

+ 89 - 7
apps/emqx_bridge/src/emqx_bridge.erl

@@ -14,13 +14,19 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 -module(emqx_bridge).
+
 -behaviour(emqx_config_handler).
+-behaviour(emqx_config_backup).
+
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--export([post_config_update/5]).
+-export([
+    pre_config_update/3,
+    post_config_update/5
+]).
 
 -export([
     load_hook/0,
@@ -53,6 +59,11 @@
 %% exported for `emqx_telemetry'
 -export([get_basic_usage_info/0]).
 
+%% Data backup
+-export([
+    import_config/1
+]).
+
 -define(EGRESS_DIR_BRIDGES(T),
     T == webhook;
     T == mysql;
@@ -80,8 +91,10 @@
     T == iotdb
 ).
 
+-define(ROOT_KEY, bridges).
+
 load() ->
-    Bridges = emqx:get_config([bridges], #{}),
+    Bridges = emqx:get_config([?ROOT_KEY], #{}),
     lists:foreach(
         fun({Type, NamedConf}) ->
             lists:foreach(
@@ -98,7 +111,7 @@ load() ->
 
 unload() ->
     unload_hook(),
-    Bridges = emqx:get_config([bridges], #{}),
+    Bridges = emqx:get_config([?ROOT_KEY], #{}),
     lists:foreach(
         fun({Type, NamedConf}) ->
             lists:foreach(
@@ -139,7 +152,7 @@ reload_hook(Bridges) ->
     ok = load_hook(Bridges).
 
 load_hook() ->
-    Bridges = emqx:get_config([bridges], #{}),
+    Bridges = emqx:get_config([?ROOT_KEY], #{}),
     load_hook(Bridges).
 
 load_hook(Bridges) ->
@@ -210,7 +223,7 @@ send_message(BridgeId, Message) ->
     send_message(BridgeType, BridgeName, ResId, Message).
 
 send_message(BridgeType, BridgeName, ResId, Message) ->
-    case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
+    case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
         not_found ->
             {error, bridge_not_found};
         #{enable := true} = Config ->
@@ -231,9 +244,14 @@ query_opts(Config) ->
     end.
 
 config_key_path() ->
-    [bridges].
+    [?ROOT_KEY].
 
-post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
+pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
+    {ok, RawConf};
+pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
+    {ok, convert_certs(NewConf)}.
+
+post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
     #{added := Added, removed := Removed, changed := Updated} =
         diff_confs(NewConf, OldConf),
     %% The config update will be failed if any task in `perform_bridge_changes` failed.
@@ -351,10 +369,74 @@ check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) ->
             remove(BridgeType, BridgeName)
     end.
 
+%%----------------------------------------------------------------------------------------
+%% Data backup
+%%----------------------------------------------------------------------------------------
+
+import_config(RawConf) ->
+    RootKeyPath = config_key_path(),
+    BridgesConf = maps:get(<<"bridges">>, RawConf, #{}),
+    OldBridgesConf = emqx:get_raw_config(RootKeyPath, #{}),
+    MergedConf = merge_confs(OldBridgesConf, BridgesConf),
+    case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of
+        {ok, #{raw_config := NewRawConf}} ->
+            {ok, #{root_key => ?ROOT_KEY, changed => changed_paths(OldBridgesConf, NewRawConf)}};
+        Error ->
+            {error, #{root_key => ?ROOT_KEY, reason => Error}}
+    end.
+
+merge_confs(OldConf, NewConf) ->
+    AllTypes = maps:keys(maps:merge(OldConf, NewConf)),
+    lists:foldr(
+        fun(Type, Acc) ->
+            NewBridges = maps:get(Type, NewConf, #{}),
+            OldBridges = maps:get(Type, OldConf, #{}),
+            Acc#{Type => maps:merge(OldBridges, NewBridges)}
+        end,
+        #{},
+        AllTypes
+    ).
+
+changed_paths(OldRawConf, NewRawConf) ->
+    maps:fold(
+        fun(Type, Bridges, ChangedAcc) ->
+            OldBridges = maps:get(Type, OldRawConf, #{}),
+            Changed = maps:get(changed, emqx_utils_maps:diff_maps(Bridges, OldBridges)),
+            [[?ROOT_KEY, Type, K] || K <- maps:keys(Changed)] ++ ChangedAcc
+        end,
+        [],
+        NewRawConf
+    ).
+
 %%========================================================================================
 %% Helper functions
 %%========================================================================================
 
+convert_certs(BridgesConf) ->
+    maps:map(
+        fun(Type, Bridges) ->
+            maps:map(
+                fun(Name, BridgeConf) ->
+                    Path = filename:join([?ROOT_KEY, Type, Name]),
+                    case emqx_connector_ssl:convert_certs(Path, BridgeConf) of
+                        {error, Reason} ->
+                            ?SLOG(error, #{
+                                msg => "bad_ssl_config",
+                                type => Type,
+                                name => Name,
+                                reason => Reason
+                            }),
+                            throw({bad_ssl_config, Reason});
+                        {ok, BridgeConf1} ->
+                            BridgeConf1
+                    end
+                end,
+                Bridges
+            )
+        end,
+        BridgesConf
+    ).
+
 perform_bridge_changes(Tasks) ->
     perform_bridge_changes(Tasks, ok).
 

+ 10 - 0
apps/emqx_dashboard/src/emqx_dashboard_admin.erl

@@ -24,6 +24,8 @@
 
 -boot_mnesia({mnesia, [boot]}).
 
+-behaviour(emqx_db_backup).
+
 %% Mnesia bootstrap
 -export([mnesia/1]).
 
@@ -54,6 +56,8 @@
     default_username/0
 ]).
 
+-export([backup_tables/0]).
+
 -type emqx_admin() :: #?ADMIN{}.
 -define(BOOTSTRAP_USER_TAG, <<"bootstrap user">>).
 
@@ -76,6 +80,12 @@ mnesia(boot) ->
         ]}
     ]).
 
+%%--------------------------------------------------------------------
+%% Data backup
+%%--------------------------------------------------------------------
+
+backup_tables() -> [?ADMIN].
+
 %%--------------------------------------------------------------------
 %% bootstrap API
 %%--------------------------------------------------------------------

+ 35 - 8
apps/emqx_exhook/src/emqx_exhook_mgr.erl

@@ -18,6 +18,7 @@
 -module(emqx_exhook_mgr).
 
 -behaviour(gen_server).
+-behaviour(emqx_config_backup).
 
 -include("emqx_exhook.hrl").
 -include_lib("emqx/include/logger.hrl").
@@ -66,6 +67,11 @@
 
 -export([roots/0]).
 
+%% Data backup
+-export([
+    import_config/1
+]).
+
 %% Running servers
 -type state() :: #{servers := servers()}.
 
@@ -98,9 +104,9 @@
 
 -export_type([servers/0, server/0]).
 
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 %% APIs
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 
 -spec start_link() ->
     ignore
@@ -137,7 +143,7 @@ call(Req) ->
 init_ref_counter_table() ->
     _ = ets:new(?HOOKS_REF_COUNTER, [named_table, public]).
 
-%%=====================================================================
+%%========================================================================================
 %% Hocon schema
 roots() ->
     emqx_exhook_schema:server_config().
@@ -179,9 +185,30 @@ post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) ->
     Result = call({update_config, UpdateReq, NewConf, OldConf}),
     {ok, Result}.
 
-%%--------------------------------------------------------------------
+%%========================================================================================
+
+%%----------------------------------------------------------------------------------------
+%% Data backup
+%%----------------------------------------------------------------------------------------
+
+import_config(#{<<"exhook">> := #{<<"servers">> := Servers} = ExHook}) ->
+    OldServers = emqx:get_raw_config(?SERVERS, []),
+    KeyFun = fun(#{<<"name">> := Name}) -> Name end,
+    ExHook1 = ExHook#{<<"servers">> => emqx_utils:merge_lists(OldServers, Servers, KeyFun)},
+    case emqx_conf:update(?EXHOOK, ExHook1, #{override_to => cluster}) of
+        {ok, #{raw_config := #{<<"servers">> := NewRawServers}}} ->
+            Changed = maps:get(changed, emqx_utils:diff_lists(NewRawServers, OldServers, KeyFun)),
+            ChangedPaths = [?SERVERS ++ [Name] || {#{<<"name">> := Name}, _} <- Changed],
+            {ok, #{root_key => ?EXHOOK, changed => ChangedPaths}};
+        Error ->
+            {error, #{root_key => ?EXHOOK, reason => Error}}
+    end;
+import_config(_RawConf) ->
+    {ok, #{root_key => ?EXHOOK, changed => []}}.
+
+%%----------------------------------------------------------------------------------------
 %% gen_server callbacks
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 
 init([]) ->
     process_flag(trap_exit, true),
@@ -333,9 +360,9 @@ terminate(Reason, State = #{servers := Servers}) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 %% Internal funcs
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 
 unload_exhooks() ->
     [
@@ -572,7 +599,7 @@ update_servers(Servers, State) ->
 set_disable(Server) ->
     Server#{status := disabled, timer := undefined}.
 
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 %% Server state persistent
 save(Name, ServerState) ->
     Saved = persistent_term:get(?APP, []),

+ 31 - 8
apps/emqx_gateway/src/emqx_gateway_conf.erl

@@ -18,6 +18,7 @@
 -module(emqx_gateway_conf).
 
 -behaviour(emqx_config_handler).
+-behaviour(emqx_config_backup).
 
 %% Load/Unload
 -export([
@@ -64,6 +65,11 @@
     post_config_update/5
 ]).
 
+%% Data backup
+-export([
+    import_config/1
+]).
+
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_authentication.hrl").
 -define(AUTHN_BIN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY).
@@ -76,9 +82,9 @@
 -define(IS_SSL(T), (T == <<"ssl_options">> orelse T == <<"dtls_options">>)).
 -define(IGNORE_KEYS, [<<"listeners">>, ?AUTHN_BIN]).
 
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 %%  Load/Unload
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 -define(GATEWAY, [gateway]).
 
 -spec load() -> ok.
@@ -89,7 +95,7 @@ load() ->
 unload() ->
     emqx_conf:remove_handler(?GATEWAY).
 
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 %% APIs
 
 -spec load_gateway(atom_or_bin(), map()) -> map_or_err().
@@ -365,9 +371,26 @@ ret_listener_or_err(GwName, {LType, LName}, {ok, #{raw_config := GwConf}}) ->
 ret_listener_or_err(_, _, Err) ->
     Err.
 
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
+%% Data backup
+%%----------------------------------------------------------------------------------------
+
+import_config(RawConf) ->
+    GatewayConf = maps:get(<<"gateway">>, RawConf, #{}),
+    OldGatewayConf = emqx:get_raw_config([<<"gateway">>], #{}),
+    MergedConf = maps:merge(OldGatewayConf, GatewayConf),
+    case emqx_conf:update([gateway], MergedConf, #{override_to => cluster}) of
+        {ok, #{raw_config := NewRawConf}} ->
+            Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawConf, OldGatewayConf)),
+            ChangedPaths = [[gateway, GwName] || GwName <- maps:keys(Changed)],
+            {ok, #{root_key => gateway, changed => ChangedPaths}};
+        Error ->
+            {error, #{root_key => gateway, reason => Error}}
+    end.
+
+%%----------------------------------------------------------------------------------------
 %% Config Handler
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 
 -spec pre_config_update(
     list(atom()),
@@ -793,9 +816,9 @@ post_config_update(?GATEWAY, _Req = #{}, NewConfig, OldConfig, _AppEnvs) ->
     ),
     ok.
 
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
+%% Internal funcs
+%%----------------------------------------------------------------------------------------
 
 tune_gw_certs(Fun, GwName, Conf) ->
     apply_to_gateway_basic_confs(

+ 4 - 15
apps/emqx_management/src/emqx_mgmt_api_api_keys.erl

@@ -183,7 +183,7 @@ delete(Keys, Fields) ->
     lists:foldl(fun(Key, Acc) -> lists:keydelete(Key, 1, Acc) end, Fields, Keys).
 
 api_key(get, _) ->
-    {200, [format(App) || App <- emqx_mgmt_auth:list()]};
+    {200, [emqx_mgmt_auth:format(App) || App <- emqx_mgmt_auth:list()]};
 api_key(post, #{body := App}) ->
     #{
         <<"name">> := Name,
@@ -194,7 +194,7 @@ api_key(post, #{body := App}) ->
     Desc = unicode:characters_to_binary(Desc0, unicode),
     case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of
         {ok, NewApp} ->
-            {200, format(NewApp)};
+            {200, emqx_mgmt_auth:format(NewApp)};
         {error, Reason} ->
             {400, #{
                 code => 'BAD_REQUEST',
@@ -206,7 +206,7 @@ api_key(post, #{body := App}) ->
 
 api_key_by_name(get, #{bindings := #{name := Name}}) ->
     case emqx_mgmt_auth:read(Name) of
-        {ok, App} -> {200, format(App)};
+        {ok, App} -> {200, emqx_mgmt_auth:format(App)};
         {error, not_found} -> {404, ?NOT_FOUND_RESPONSE}
     end;
 api_key_by_name(delete, #{bindings := #{name := Name}}) ->
@@ -219,20 +219,9 @@ api_key_by_name(put, #{bindings := #{name := Name}, body := Body}) ->
     ExpiredAt = ensure_expired_at(Body),
     Desc = maps:get(<<"desc">>, Body, undefined),
     case emqx_mgmt_auth:update(Name, Enable, ExpiredAt, Desc) of
-        {ok, App} -> {200, format(App)};
+        {ok, App} -> {200, emqx_mgmt_auth:format(App)};
         {error, not_found} -> {404, ?NOT_FOUND_RESPONSE}
     end.
 
-format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) ->
-    ExpiredAt =
-        case ExpiredAt0 of
-            infinity -> <<"infinity">>;
-            _ -> list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt0))
-        end,
-    App#{
-        expired_at => ExpiredAt,
-        created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt))
-    }.
-
 ensure_expired_at(#{<<"expired_at">> := ExpiredAt}) when is_integer(ExpiredAt) -> ExpiredAt;
 ensure_expired_at(_) -> infinity.

+ 16 - 28
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -19,7 +19,6 @@
 -behaviour(minirest_api).
 
 -export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]).
--import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]).
 
 -export([
     listener_type_status/2,
@@ -36,6 +35,16 @@
     do_list_listeners/0
 ]).
 
+-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]).
+
+-import(emqx_mgmt_listeners_conf, [
+    action/4,
+    create/3,
+    ensure_remove/2,
+    get_raw/2,
+    update/3
+]).
+
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
@@ -44,7 +53,6 @@
 -define(LISTENER_NOT_FOUND, <<"Listener id not found">>).
 -define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>).
 -define(ADDR_PORT_INUSE, <<"Addr port in use">>).
--define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}).
 
 namespace() -> "listeners".
 
@@ -387,14 +395,13 @@ crud_listeners_by_id(get, #{bindings := #{id := Id0}}) ->
 crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
     case parse_listener_conf(Body0) of
         {Id, Type, Name, Conf} ->
-            Path = [listeners, Type, Name],
-            case emqx_conf:get_raw(Path, undefined) of
+            case get_raw(Type, Name) of
                 undefined ->
                     {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
                 PrevConf ->
                     MergeConfT = emqx_utils_maps:deep_merge(PrevConf, Conf),
                     MergeConf = emqx_listeners:ensure_override_limiter_conf(MergeConfT, Conf),
-                    case update(Path, MergeConf) of
+                    case update(Type, Name, MergeConf) of
                         {ok, #{raw_config := _RawConf}} ->
                             crud_listeners_by_id(get, #{bindings => #{id => Id}});
                         {error, not_found} ->
@@ -412,7 +419,7 @@ crud_listeners_by_id(post, #{body := Body}) ->
     create_listener(Body);
 crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
     {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
-    case ensure_remove([listeners, Type, Name]) of
+    case ensure_remove(Type, Name) of
         {ok, _} -> {204};
         {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
     end.
@@ -457,12 +464,11 @@ restart_listeners_by_id(Method, Body = #{bindings := Bindings}) ->
 
 action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) ->
     {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
-    Path = [listeners, Type, Name],
-    case emqx_conf:get_raw(Path, undefined) of
+    case get_raw(Type, Name) of
         undefined ->
             {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
         _PrevConf ->
-            case action(Path, Action, enabled(Action)) of
+            case action(Type, Name, Action, enabled(Action)) of
                 {ok, #{raw_config := _RawConf}} ->
                     {200};
                 {error, not_found} ->
@@ -634,23 +640,6 @@ max_conn(_Int1, <<"infinity">>) -> <<"infinity">>;
 max_conn(<<"infinity">>, _Int) -> <<"infinity">>;
 max_conn(Int1, Int2) -> Int1 + Int2.
 
-update(Path, Conf) ->
-    wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))).
-
-action(Path, Action, Conf) ->
-    wrap(emqx_conf:update(Path, {action, Action, Conf}, ?OPTS(cluster))).
-
-create(Path, Conf) ->
-    wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))).
-
-ensure_remove(Path) ->
-    wrap(emqx_conf:tombstone(Path, ?OPTS(cluster))).
-
-wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason};
-wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason};
-wrap({error, Reason}) -> {error, Reason};
-wrap(Ok) -> Ok.
-
 listener_type_status_example() ->
     [
         #{
@@ -813,8 +802,7 @@ tcp_schema_example() ->
 create_listener(Body) ->
     case parse_listener_conf(Body) of
         {Id, Type, Name, Conf} ->
-            Path = [listeners, Type, Name],
-            case create(Path, Conf) of
+            case create(Type, Name, Conf) of
                 {ok, #{raw_config := _RawConf}} ->
                     crud_listeners_by_id(get, #{bindings => #{id => Id}});
                 {error, already_exist} ->

+ 23 - 1
apps/emqx_management/src/emqx_mgmt_auth.erl

@@ -17,6 +17,8 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 
+-behaviour(emqx_db_backup).
+
 %% API
 -export([mnesia/1]).
 -boot_mnesia({mnesia, [boot]}).
@@ -28,12 +30,15 @@
     update/4,
     delete/1,
     list/0,
-    init_bootstrap_file/0
+    init_bootstrap_file/0,
+    format/1
 ]).
 
 -export([authorize/3]).
 -export([post_config_update/5]).
 
+-export([backup_tables/0]).
+
 %% Internal exports (RPC)
 -export([
     do_update/4,
@@ -67,6 +72,12 @@ mnesia(boot) ->
         {attributes, record_info(fields, ?APP)}
     ]).
 
+%%--------------------------------------------------------------------
+%% Data backup
+%%--------------------------------------------------------------------
+
+backup_tables() -> [?APP].
+
 post_config_update([api_key], _Req, NewConf, _OldConf, _AppEnvs) ->
     #{bootstrap_file := File} = NewConf,
     case init_bootstrap_file(File) of
@@ -127,6 +138,17 @@ do_delete(Name) ->
         [_App] -> mnesia:delete({?APP, Name})
     end.
 
+format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) ->
+    ExpiredAt =
+        case ExpiredAt0 of
+            infinity -> <<"infinity">>;
+            _ -> list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt0))
+        end,
+    App#{
+        expired_at => ExpiredAt,
+        created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt))
+    }.
+
 list() ->
     to_map(ets:match_object(?APP, #?APP{_ = '_'})).
 

+ 34 - 1
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -25,6 +25,7 @@
 -include("emqx_mgmt.hrl").
 
 -define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~ts~n", [Cmd, Descr])).
+-define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}).
 
 -export([load/0]).
 
@@ -44,7 +45,8 @@
     log/1,
     authz/1,
     pem_cache/1,
-    olp/1
+    olp/1,
+    data/1
 ]).
 
 -define(PROC_INFOKEYS, [
@@ -739,6 +741,37 @@ olp(_) ->
         {"olp disable", "Disable overload protection"}
     ]).
 
+%%--------------------------------------------------------------------
+%% @doc data Command
+
+data(["export"]) ->
+    case emqx_mgmt_data_backup:export(?DATA_BACKUP_OPTS) of
+        {ok, #{filename := Filename}} ->
+            emqx_ctl:print("Data has been successfully exported to ~s.~n", [Filename]);
+        {error, Reason} ->
+            Reason1 = emqx_mgmt_data_backup:format_error(Reason),
+            emqx_ctl:print("[error] Data export failed, reason: ~p.~n", [Reason1])
+    end;
+data(["import", Filename]) ->
+    case emqx_mgmt_data_backup:import(Filename, ?DATA_BACKUP_OPTS) of
+        {ok, #{db_errors := DbErrs, config_errors := ConfErrs}} when
+            map_size(DbErrs) =:= 0, map_size(ConfErrs) =:= 0
+        ->
+            emqx_ctl:print("Data has been imported successfully.~n");
+        {ok, _} ->
+            emqx_ctl:print(
+                "Data has been imported, but some errors occurred, see the the log above.~n"
+            );
+        {error, Reason} ->
+            Reason1 = emqx_mgmt_data_backup:format_error(Reason),
+            emqx_ctl:print("[error] Data import failed, reason: ~p.~n", [Reason1])
+    end;
+data(_) ->
+    emqx_ctl:usage([
+        {"data import <File>", "Import data from the specified tar archive file"},
+        {"data export", "Export data"}
+    ]).
+
 %%--------------------------------------------------------------------
 %% Dump ETS
 %%--------------------------------------------------------------------

+ 690 - 0
apps/emqx_management/src/emqx_mgmt_data_backup.erl

@@ -0,0 +1,690 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_data_backup).
+
+-export([
+    export/0,
+    export/1,
+    import/1,
+    import/2,
+    format_error/1
+]).
+
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
+
+-include_lib("kernel/include/file.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+-define(ROOT_BACKUP_DIR, "backup").
+-define(BACKUP_MNESIA_DIR, "mnesia").
+-define(TAR_SUFFIX, ".tar.gz").
+-define(META_FILENAME, "META.hocon").
+-define(CLUSTER_HOCON_FILENAME, "cluster.hocon").
+-define(CONF_KEYS, [
+    <<"delayed">>,
+    <<"rewrite">>,
+    <<"retainer">>,
+    <<"mqtt">>,
+    <<"alarm">>,
+    <<"sysmon">>,
+    <<"sys_topics">>,
+    <<"limiter">>,
+    <<"log">>,
+    <<"persistent_session_store">>,
+    <<"prometheus">>,
+    <<"crl_cache">>,
+    <<"conn_congestion">>,
+    <<"force_shutdown">>,
+    <<"flapping_detect">>,
+    <<"broker">>,
+    <<"force_gc">>,
+    <<"zones">>
+]).
+
+-define(DEFAULT_OPTS, #{}).
+-define(tar(_FileName_), _FileName_ ++ ?TAR_SUFFIX).
+-define(fmt_tar_err(_Expr_),
+    fun() ->
+        case _Expr_ of
+            {error, _Reason_} -> {error, erl_tar:format_error(_Reason_)};
+            _Other_ -> _Other_
+        end
+    end()
+).
+
+-type backup_file_info() :: #{
+    filename => binary(),
+    size => non_neg_integer(),
+    created_at => binary(),
+    node => node(),
+    atom() => _
+}.
+
+-type db_error_details() :: #{mria:table() => {error, _}}.
+-type config_error_details() :: #{emqx_utils_maps:config_path() => {error, _}}.
+
+%%------------------------------------------------------------------------------
+%% APIs
+%%------------------------------------------------------------------------------
+
+-spec export() -> {ok, backup_file_info()} | {error, _}.
+export() ->
+    export(?DEFAULT_OPTS).
+
+-spec export(map()) -> {ok, backup_file_info()} | {error, _}.
+export(Opts) ->
+    {BackupName, TarDescriptor} = prepare_new_backup(Opts),
+    try
+        do_export(BackupName, TarDescriptor, Opts)
+    catch
+        Class:Reason:Stack ->
+            ?SLOG(error, #{
+                msg => "emqx_data_export_failed",
+                exception => Class,
+                reason => Reason,
+                stacktrace => Stack
+            }),
+            {error, Reason}
+    after
+        %% erl_tar:close/1 raises error if called on an already closed tar
+        catch erl_tar:close(TarDescriptor),
+        file:del_dir_r(BackupName)
+    end.
+
+-spec import(file:filename_all()) ->
+    {ok, #{db_errors => db_error_details(), config_errors => config_error_details()}}
+    | {error, _}.
+import(BackupFileName) ->
+    import(BackupFileName, ?DEFAULT_OPTS).
+
+-spec import(file:filename_all(), map()) ->
+    {ok, #{db_errors => db_error_details(), config_errors => config_error_details()}}
+    | {error, _}.
+import(BackupFileName, Opts) ->
+    case is_import_allowed() of
+        true ->
+            case lookup_file(str(BackupFileName)) of
+                {ok, FilePath} ->
+                    do_import(FilePath, Opts);
+                Err ->
+                    Err
+            end;
+        false ->
+            {error, not_core_node}
+    end.
+
+format_error(not_core_node) ->
+    str(
+        io_lib:format(
+            "backup data import is only allowed on core EMQX nodes, but requested node ~p is not core",
+            [node()]
+        )
+    );
+format_error(ee_to_ce_backup) ->
+    "importing EMQX Enterprise data backup to EMQX is not allowed";
+format_error(missing_backup_meta) ->
+    "invalid backup archive file: missing " ?META_FILENAME;
+format_error(invalid_edition) ->
+    "invalid backup archive content: wrong EMQX edition value in " ?META_FILENAME;
+format_error(invalid_version) ->
+    "invalid backup archive content: wrong EMQX version value in " ?META_FILENAME;
+format_error(bad_archive_dir) ->
+    "invalid backup archive content: all files in the archive must be under <backup name> directory";
+format_error(not_found) ->
+    "backup file not found";
+format_error(bad_backup_name) ->
+    "invalid backup name: file name must have " ?TAR_SUFFIX " extension";
+format_error({unsupported_version, ImportVersion}) ->
+    str(
+        io_lib:format(
+            "[warning] Backup version ~p is newer than EMQX version ~p, import is not allowed.~n",
+            [str(ImportVersion), str(emqx_release:version())]
+        )
+    );
+format_error(Reason) ->
+    Reason.
+
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+prepare_new_backup(Opts) ->
+    Ts = erlang:system_time(millisecond),
+    {{Y, M, D}, {HH, MM, SS}} = local_datetime(Ts),
+    BackupBaseName = str(
+        io_lib:format(
+            "emqx-export-~0p-~2..0b-~2..0b-~2..0b-~2..0b-~2..0b.~3..0b",
+            [Y, M, D, HH, MM, SS, Ts rem 1000]
+        )
+    ),
+    BackupName = filename:join(root_backup_dir(), BackupBaseName),
+    BackupTarName = ?tar(BackupName),
+    maybe_print("Exporting data to ~p...~n", [BackupTarName], Opts),
+    {ok, TarDescriptor} = ?fmt_tar_err(erl_tar:open(BackupTarName, [write, compressed])),
+    {BackupName, TarDescriptor}.
+
+do_export(BackupName, TarDescriptor, Opts) ->
+    BackupBaseName = filename:basename(BackupName),
+    BackupTarName = ?tar(BackupName),
+    Meta = #{
+        version => emqx_release:version(),
+        edition => emqx_release:edition()
+    },
+    MetaBin = bin(hocon_pp:do(Meta, #{})),
+    MetaFileName = filename:join(BackupBaseName, ?META_FILENAME),
+
+    ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, MetaBin, MetaFileName, [])),
+    ok = export_cluster_hocon(TarDescriptor, BackupBaseName, Opts),
+    ok = export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts),
+    ok = ?fmt_tar_err(erl_tar:close(TarDescriptor)),
+    {ok, #file_info{
+        size = Size,
+        ctime = {{Y1, M1, D1}, {H1, MM1, S1}}
+    }} = file:read_file_info(BackupTarName),
+    CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y1, M1, D1, H1, MM1, S1]),
+    {ok, #{
+        filename => bin(BackupTarName),
+        size => Size,
+        created_at => bin(CreatedAt),
+        node => node()
+    }}.
+
+export_cluster_hocon(TarDescriptor, BackupBaseName, Opts) ->
+    maybe_print("Exporting cluster configuration...~n", [], Opts),
+    RawConf = emqx_config:read_override_conf(#{override_to => cluster}),
+    maybe_print(
+        "Exporting additional files from EMQX data_dir: ~p...~n", [str(emqx:data_dir())], Opts
+    ),
+    RawConf1 = read_data_files(RawConf),
+    RawConfBin = bin(hocon_pp:do(RawConf1, #{})),
+    NameInArchive = filename:join(BackupBaseName, ?CLUSTER_HOCON_FILENAME),
+    ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, RawConfBin, NameInArchive, [])).
+
+export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts) ->
+    maybe_print("Exporting built-in database...~n", [], Opts),
+    lists:foreach(
+        fun(Tab) -> export_mnesia_tab(TarDescriptor, Tab, BackupName, BackupBaseName, Opts) end,
+        tabs_to_backup()
+    ).
+
+export_mnesia_tab(TarDescriptor, TabName, BackupName, BackupBaseName, Opts) ->
+    maybe_print("Exporting ~p database table...~n", [TabName], Opts),
+    {ok, MnesiaBackupName} = do_export_mnesia_tab(TabName, BackupName),
+    NameInArchive = mnesia_backup_name(BackupBaseName, TabName),
+    ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, MnesiaBackupName, NameInArchive, [])),
+    _ = file:delete(MnesiaBackupName),
+    ok.
+
+do_export_mnesia_tab(TabName, BackupName) ->
+    Node = node(),
+    try
+        {ok, TabName, [Node]} = mnesia:activate_checkpoint(
+            [{name, TabName}, {min, [TabName]}, {allow_remote, false}]
+        ),
+        MnesiaBackupName = mnesia_backup_name(BackupName, TabName),
+        ok = filelib:ensure_dir(MnesiaBackupName),
+        ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName),
+        {ok, MnesiaBackupName}
+    after
+        mnesia:deactivate_checkpoint(TabName)
+    end.
+
+-ifdef(TEST).
+tabs_to_backup() ->
+    %% Allow mocking in tests
+    ?MODULE:mnesia_tabs_to_backup().
+-else.
+tabs_to_backup() ->
+    mnesia_tabs_to_backup().
+-endif.
+
+mnesia_tabs_to_backup() ->
+    lists:flatten([M:backup_tables() || M <- find_behaviours(emqx_db_backup)]).
+
+mnesia_backup_name(Path, TabName) ->
+    filename:join([Path, ?BACKUP_MNESIA_DIR, atom_to_list(TabName)]).
+
+is_import_allowed() ->
+    mria_rlog:role() =:= core.
+
+validate_backup(BackupDir) ->
+    case hocon:files([filename:join(BackupDir, ?META_FILENAME)]) of
+        {ok, #{
+            <<"edition">> := Edition,
+            <<"version">> := Version
+        }} = Meta ->
+            validate(
+                [
+                    fun() -> check_edition(Edition) end,
+                    fun() -> check_version(Version) end
+                ],
+                Meta
+            );
+        _ ->
+            ?SLOG(error, #{msg => "missing_backup_meta", backup => BackupDir}),
+            {error, missing_backup_meta}
+    end.
+
+validate([ValidatorFun | T], OkRes) ->
+    case ValidatorFun() of
+        ok -> validate(T, OkRes);
+        Err -> Err
+    end;
+validate([], OkRes) ->
+    OkRes.
+
+check_edition(BackupEdition) when BackupEdition =:= <<"ce">>; BackupEdition =:= <<"ee">> ->
+    Edition = bin(emqx_release:edition()),
+    case {BackupEdition, Edition} of
+        {<<"ee">>, <<"ce">>} ->
+            {error, ee_to_ce_backup};
+        _ ->
+            ok
+    end;
+check_edition(BackupEdition) ->
+    ?SLOG(error, #{msg => "invalid_backup_edition", edition => BackupEdition}),
+    {error, invalid_edition}.
+
+check_version(ImportVersion) ->
+    case parse_version_no_patch(ImportVersion) of
+        {ok, {ImportMajorInt, ImportMinorInt}} ->
+            Version = emqx_release:version(),
+            {ok, {MajorInt, MinorInt}} = parse_version_no_patch(bin(Version)),
+            case ImportMajorInt > MajorInt orelse ImportMinorInt > MinorInt of
+                true ->
+                    %% 4.x backup files are anyway not compatible and will be treated as invalid,
+                    %% before this step,
+                    {error, {unsupported_version, str(ImportVersion)}};
+                false ->
+                    ok
+            end;
+        Err ->
+            Err
+    end.
+
+parse_version_no_patch(VersionBin) ->
+    case string:split(VersionBin, ".", all) of
+        [Major, Minor | _] ->
+            {MajorInt, _} = emqx_utils_binary:bin_to_int(Major),
+            {MinorInt, _} = emqx_utils_binary:bin_to_int(Minor),
+            {ok, {MajorInt, MinorInt}};
+        _ ->
+            ?SLOG(error, #{msg => "failed_to_parse_backup_version", version => VersionBin}),
+            {error, invalid_version}
+    end.
+
+do_import(BackupFileName, Opts) ->
+    BackupDir = filename:join(root_backup_dir(), filename:basename(BackupFileName, ?TAR_SUFFIX)),
+    maybe_print("Importing data from ~p...~n", [BackupFileName], Opts),
+    try
+        ok = validate_backup_name(BackupFileName),
+        ok = extract_backup(BackupFileName),
+        {ok, _} = validate_backup(BackupDir),
+        ConfErrors = import_cluster_hocon(BackupDir, Opts),
+        MnesiaErrors = import_mnesia_tabs(BackupDir, Opts),
+        ?SLOG(info, #{msg => "emqx_data_import_success"}),
+        {ok, #{db_errors => MnesiaErrors, config_errors => ConfErrors}}
+    catch
+        error:{badmatch, {error, Reason}}:Stack ->
+            ?SLOG(error, #{msg => "emqx_data_import_failed", reason => Reason, stacktrace => Stack}),
+            {error, Reason};
+        Class:Reason:Stack ->
+            ?SLOG(error, #{
+                msg => "emqx_data_import_failed",
+                exception => Class,
+                reason => Reason,
+                stacktrace => Stack
+            }),
+            {error, Reason}
+    after
+        file:del_dir_r(BackupDir)
+    end.
+
+import_mnesia_tabs(BackupDir, Opts) ->
+    maybe_print("Importing built-in database...~n", [], Opts),
+    filter_errors(
+        lists:foldr(
+            fun(Tab, Acc) -> Acc#{Tab => import_mnesia_tab(BackupDir, Tab, Opts)} end,
+            #{},
+            tabs_to_backup()
+        )
+    ).
+
+import_mnesia_tab(BackupDir, TabName, Opts) ->
+    MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName),
+    case filelib:is_regular(MnesiaBackupFileName) of
+        true ->
+            maybe_print("Importing ~p database table...~n", [TabName], Opts),
+            restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts);
+        false ->
+            maybe_print("No backup file for ~p database table...~n", [TabName], Opts),
+            ?SLOG(info, #{msg => "missing_mnesia_backup", table => TabName, backup => BackupDir}),
+            ok
+    end.
+
+restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) ->
+    BackupNameToImport = MnesiaBackupFileName ++ "_for_import",
+    Prepared =
+        catch mnesia:traverse_backup(
+            MnesiaBackupFileName, BackupNameToImport, fun backup_converter/2, 0
+        ),
+    try
+        case Prepared of
+            {ok, _} ->
+                Restored = mnesia:restore(BackupNameToImport, [{default_op, keep_tables}]),
+                case Restored of
+                    {atomic, [TabName]} ->
+                        ok;
+                    RestoreErr ->
+                        ?SLOG(error, #{
+                            msg => "failed_to_restore_mnesia_backup",
+                            table => TabName,
+                            backup => BackupDir,
+                            reason => RestoreErr
+                        }),
+                        maybe_print_mnesia_import_err(TabName, RestoreErr, Opts),
+                        {error, RestoreErr}
+                end;
+            PrepareErr ->
+                ?SLOG(error, #{
+                    msg => "failed_to_prepare_mnesia_backup_for_restoring",
+                    table => TabName,
+                    backup => BackupDir,
+                    reason => PrepareErr
+                }),
+                maybe_print_mnesia_import_err(TabName, PrepareErr, Opts),
+                PrepareErr
+        end
+    after
+        %% Cleanup files as soon as they are not needed any more for more efficient disk usage
+        _ = file:delete(BackupNameToImport),
+        _ = file:delete(MnesiaBackupFileName)
+    end.
+
+backup_converter({schema, Tab, CreateList}, Acc) ->
+    check_rec_attributes(Tab, CreateList),
+    {[{schema, Tab, lists:map(fun convert_copies/1, CreateList)}], Acc};
+backup_converter(Other, Acc) ->
+    {[Other], Acc}.
+
+check_rec_attributes(Tab, CreateList) ->
+    ImportAttributes = proplists:get_value(attributes, CreateList),
+    Attributes = mnesia:table_info(Tab, attributes),
+    case ImportAttributes =/= Attributes of
+        true ->
+            throw({error, different_table_schema});
+        false ->
+            ok
+    end.
+
+convert_copies({K, [_ | _]}) when K == ram_copies; K == disc_copies; K == disc_only_copies ->
+    {K, [node()]};
+convert_copies(Other) ->
+    Other.
+
+extract_backup(BackupFileName) ->
+    BackupDir = root_backup_dir(),
+    ok = validate_filenames(BackupFileName),
+    ?fmt_tar_err(erl_tar:extract(BackupFileName, [{cwd, BackupDir}, compressed])).
+
+validate_filenames(BackupFileName) ->
+    {ok, FileNames} = ?fmt_tar_err(erl_tar:table(BackupFileName, [compressed])),
+    BackupName = filename:basename(BackupFileName, ?TAR_SUFFIX),
+    IsValid = lists:all(
+        fun(FileName) ->
+            [Root | _] = filename:split(FileName),
+            Root =:= BackupName
+        end,
+        FileNames
+    ),
+    case IsValid of
+        true -> ok;
+        false -> {error, bad_archive_dir}
+    end.
+
+import_cluster_hocon(BackupDir, Opts) ->
+    HoconFileName = filename:join(BackupDir, ?CLUSTER_HOCON_FILENAME),
+    case filelib:is_regular(HoconFileName) of
+        true ->
+            {ok, RawConf} = hocon:files([HoconFileName]),
+            {ok, _} = validate_cluster_hocon(RawConf),
+            maybe_print("Importing cluster configuration...~n", [], Opts),
+            %% At this point, when all validations have been passed, we want to log errors (if any)
+            %% but proceed with the next items, instead of aborting the whole import operation
+            do_import_conf(RawConf, Opts);
+        false ->
+            maybe_print("No cluster configuration to be imported.~n", [], Opts),
+            ?SLOG(info, #{
+                msg => "no_backup_hocon_config_to_import",
+                backup => BackupDir
+            }),
+            #{}
+    end.
+
+read_data_files(RawConf) ->
+    DataDir = bin(emqx:data_dir()),
+    {ok, Cwd} = file:get_cwd(),
+    AbsDataDir = bin(filename:join(Cwd, DataDir)),
+    RawConf1 = emqx_authz:maybe_read_acl_file(RawConf),
+    emqx_utils_maps:deep_convert(RawConf1, fun read_data_file/4, [DataDir, AbsDataDir]).
+
+-define(dir_pattern(_Dir_), <<_Dir_:(byte_size(_Dir_))/binary, _/binary>>).
+
+read_data_file(Key, Val, DataDir, AbsDataDir) ->
+    Val1 =
+        case Val of
+            ?dir_pattern(DataDir) = FileName ->
+                do_read_file(FileName);
+            ?dir_pattern(AbsDataDir) = FileName ->
+                do_read_file(FileName);
+            V ->
+                V
+        end,
+    {Key, Val1}.
+
+do_read_file(FileName) ->
+    case file:read_file(FileName) of
+        {ok, Content} ->
+            Content;
+        {error, Reason} ->
+            ?SLOG(warning, #{
+                msg => "failed_to_read_data_file",
+                filename => FileName,
+                reason => Reason
+            }),
+            FileName
+    end.
+
+validate_cluster_hocon(RawConf) ->
+    %% write ACL file to comply with the schema...
+    RawConf1 = emqx_authz:maybe_write_acl_file(RawConf),
+    emqx_hocon:check(
+        emqx_conf:schema_module(),
+        maps:merge(emqx:get_raw_config([]), RawConf1),
+        #{atom_key => false, required => false}
+    ).
+
+do_import_conf(RawConf, Opts) ->
+    GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))),
+    maybe_print_errors(GenConfErrs, Opts),
+    Errors =
+        lists:foldr(
+            fun(Module, ErrorsAcc) ->
+                Module:import_config(RawConf),
+                case Module:import_config(RawConf) of
+                    {ok, #{changed := Changed}} ->
+                        maybe_print_changed(Changed, Opts),
+                        ErrorsAcc;
+                    {error, #{root_key := RootKey, reason := Reason}} ->
+                        ErrorsAcc#{[RootKey] => Reason}
+                end
+            end,
+            GenConfErrs,
+            find_behaviours(emqx_config_backup)
+        ),
+    maybe_print_errors(Errors, Opts),
+    Errors.
+
+import_generic_conf(Data) ->
+    lists:map(
+        fun(Key) ->
+            case maps:get(Key, Data, undefined) of
+                undefined -> {[Key], ok};
+                Conf -> {[Key], emqx_conf:update([Key], Conf, #{override_to => cluster})}
+            end
+        end,
+        ?CONF_KEYS
+    ).
+
+maybe_print_changed(Changed, Opts) ->
+    lists:foreach(
+        fun(ChangedPath) ->
+            maybe_print(
+                "Config key path ~p was present before import and "
+                "has been overwritten.~n",
+                [pretty_path(ChangedPath)],
+                Opts
+            )
+        end,
+        Changed
+    ).
+
+maybe_print_errors(Errors, Opts) ->
+    maps:foreach(
+        fun(Path, Err) ->
+            maybe_print(
+                "Failed to import the following config path: ~p, reason: ~p~n",
+                [pretty_path(Path), Err],
+                Opts
+            )
+        end,
+        Errors
+    ).
+
+filter_errors(Results) ->
+    maps:filter(
+        fun
+            (_Path, {error, _}) -> true;
+            (_, _) -> false
+        end,
+        Results
+    ).
+
+pretty_path(Path) ->
+    str(lists:join(".", [str(Part) || Part <- Path])).
+
+str(Data) when is_atom(Data) ->
+    atom_to_list(Data);
+str(Data) ->
+    unicode:characters_to_list(Data).
+
+bin(Data) when is_atom(Data) ->
+    atom_to_binary(Data, utf8);
+bin(Data) ->
+    unicode:characters_to_binary(Data).
+
+validate_backup_name(FileName) ->
+    BaseName = filename:basename(FileName, ?TAR_SUFFIX),
+    ValidName = BaseName ++ ?TAR_SUFFIX,
+    case filename:basename(FileName) of
+        ValidName -> ok;
+        _ -> {error, bad_backup_name}
+    end.
+
+lookup_file(FileName) ->
+    case filelib:is_regular(FileName) of
+        true ->
+            {ok, FileName};
+        false ->
+            %% Only lookup by basename, don't allow to lookup by file path
+            case FileName =:= filename:basename(FileName) of
+                true ->
+                    FilePath = filename:join(root_backup_dir(), FileName),
+                    case filelib:is_file(FilePath) of
+                        true -> {ok, FilePath};
+                        false -> {error, not_found}
+                    end;
+                false ->
+                    {error, not_found}
+            end
+    end.
+
+root_backup_dir() ->
+    Dir = filename:join(emqx:data_dir(), ?ROOT_BACKUP_DIR),
+    ok = ensure_path(Dir),
+    Dir.
+
+-if(?OTP_RELEASE < 25).
+ensure_path(Path) -> filelib:ensure_dir(filename:join([Path, "dummy"])).
+-else.
+ensure_path(Path) -> filelib:ensure_path(Path).
+-endif.
+
+local_datetime(MillisecondTs) ->
+    calendar:system_time_to_local_time(MillisecondTs, millisecond).
+
+maybe_print(Format, Args, #{print_fun := PrintFun}) ->
+    PrintFun(Format, Args);
+maybe_print(_Format, _Args, _Opts) ->
+    ok.
+
+maybe_print_mnesia_import_err(TabName, Error, Opts) ->
+    maybe_print(
+        "[error] Failed to import built-in database table: ~p, reason: ~p~n",
+        [TabName, Error],
+        Opts
+    ).
+
+find_behaviours(Behaviour) ->
+    find_behaviours(Behaviour, apps(), []).
+
+%% Based on minirest_api:find_api_modules/1
+find_behaviours(_Behaviour, [] = _Apps, Acc) ->
+    Acc;
+find_behaviours(Behaviour, [App | Apps], Acc) ->
+    case application:get_key(App, modules) of
+        undefined ->
+            Acc;
+        {ok, Modules} ->
+            NewAcc = lists:filter(
+                fun(Module) ->
+                    Info = Module:module_info(attributes),
+                    Bhvrs = lists:flatten(
+                        proplists:get_all_values(behavior, Info) ++
+                            proplists:get_all_values(behaviour, Info)
+                    ),
+                    lists:member(Behaviour, Bhvrs)
+                end,
+                Modules
+            ),
+            find_behaviours(Behaviour, Apps, NewAcc ++ Acc)
+    end.
+
+apps() ->
+    [
+        App
+     || {App, _, _} <- application:loaded_applications(),
+        case re:run(atom_to_list(App), "^emqx") of
+            {match, [{0, 4}]} -> true;
+            _ -> false
+        end
+    ].

+ 96 - 0
apps/emqx_management/src/emqx_mgmt_listeners_conf.erl

@@ -0,0 +1,96 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_listeners_conf).
+
+-behaviour(emqx_config_backup).
+
+-export([
+    action/4,
+    create/3,
+    ensure_remove/2,
+    get_raw/2,
+    update/3
+]).
+
+%% Data backup
+-export([
+    import_config/1
+]).
+
+-include_lib("emqx/include/logger.hrl").
+
+-define(CONF_ROOT_KEY, listeners).
+-define(path(_Type_, _Name_), [?CONF_ROOT_KEY, _Type_, _Name_]).
+-define(OPTS, #{rawconf_with_defaults => true, override_to => cluster}).
+-define(IMPORT_OPTS, #{override_to => cluster}).
+
+action(Type, Name, Action, Conf) ->
+    wrap(emqx_conf:update(?path(Type, Name), {action, Action, Conf}, ?OPTS)).
+
+create(Type, Name, Conf) ->
+    wrap(emqx_conf:update(?path(Type, Name), {create, Conf}, ?OPTS)).
+
+ensure_remove(Type, Name) ->
+    wrap(emqx_conf:tombstone(?path(Type, Name), ?OPTS)).
+
+get_raw(Type, Name) -> emqx_conf:get_raw(?path(Type, Name), undefined).
+
+update(Type, Name, Conf) ->
+    wrap(emqx_conf:update(?path(Type, Name), {update, Conf}, ?OPTS)).
+
+wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason};
+wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason};
+wrap({error, Reason}) -> {error, Reason};
+wrap(Ok) -> Ok.
+
+%%------------------------------------------------------------------------------
+%% Data backup
+%%------------------------------------------------------------------------------
+
+import_config(RawConf) ->
+    NewConf = maps:get(<<"listeners">>, RawConf, #{}),
+    OldConf = emqx:get_raw_config([?CONF_ROOT_KEY], #{}),
+    MergedConf = merge_confs(OldConf, NewConf),
+    case emqx_conf:update([?CONF_ROOT_KEY], MergedConf, ?IMPORT_OPTS) of
+        {ok, #{raw_config := NewRawConf}} ->
+            {ok, #{root_key => ?CONF_ROOT_KEY, changed => changed_paths(OldConf, NewRawConf)}};
+        Error ->
+            {error, #{root_key => ?CONF_ROOT_KEY, reason => Error}}
+    end.
+
+merge_confs(OldConf, NewConf) ->
+    AllTypes = maps:keys(maps:merge(OldConf, NewConf)),
+    lists:foldr(
+        fun(Type, Acc) ->
+            NewListeners = maps:get(Type, NewConf, #{}),
+            OldListeners = maps:get(Type, OldConf, #{}),
+            Acc#{Type => maps:merge(OldListeners, NewListeners)}
+        end,
+        #{},
+        AllTypes
+    ).
+
+changed_paths(OldRawConf, NewRawConf) ->
+    maps:fold(
+        fun(Type, Listeners, ChangedAcc) ->
+            OldListeners = maps:get(Type, OldRawConf, #{}),
+            Changed = maps:get(changed, emqx_utils_maps:diff_maps(Listeners, OldListeners)),
+            [?path(Type, K) || K <- maps:keys(Changed)] ++ ChangedAcc
+        end,
+        [],
+        NewRawConf
+    ).

+ 519 - 0
apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl

@@ -0,0 +1,519 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_data_backup_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(BOOTSTRAP_BACKUP, "emqx-export-test-bootstrap-ce.tar.gz").
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    [application:load(App) || App <- apps_to_start() ++ apps_to_load()],
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
+
+init_per_testcase(t_import_on_cluster, Config) ->
+    %% Don't import listeners to avoid port conflicts
+    %% when the same conf will be imported to another cluster
+    meck:new(emqx_mgmt_listeners_conf, [passthrough]),
+    meck:new(emqx_gateway_conf, [passthrough]),
+    meck:expect(
+        emqx_mgmt_listeners_conf,
+        import_config,
+        1,
+        {ok, #{changed => [], root_key => listeners}}
+    ),
+    meck:expect(
+        emqx_gateway_conf,
+        import_config,
+        1,
+        {ok, #{changed => [], root_key => gateway}}
+    ),
+    [{cluster, cluster(Config)} | setup(Config)];
+init_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) ->
+    [{cluster, cluster(Config)} | setup(Config)];
+init_per_testcase(t_mnesia_bad_tab_schema, Config) ->
+    meck:new(emqx_mgmt_data_backup, [passthrough]),
+    meck:expect(emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [data_backup_test]),
+    setup(Config);
+init_per_testcase(_TestCase, Config) ->
+    setup(Config).
+
+end_per_testcase(t_import_on_cluster, Config) ->
+    cleanup_cluster(?config(cluster, Config)),
+    cleanup(Config),
+    meck:unload(emqx_mgmt_listeners_conf),
+    meck:unload(emqx_gateway_conf);
+end_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) ->
+    cleanup_cluster(?config(cluster, Config)),
+    cleanup(Config);
+end_per_testcase(t_mnesia_bad_tab_schema, Config) ->
+    cleanup(Config),
+    meck:unload(emqx_mgmt_data_backup);
+end_per_testcase(_TestCase, Config) ->
+    cleanup(Config).
+
+t_empty_export_import(_Config) ->
+    ExpRawConf = emqx:get_raw_config([]),
+    {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
+    Exp = {ok, #{db_errors => #{}, config_errors => #{}}},
+    ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
+    ?assertEqual(ExpRawConf, emqx:get_raw_config([])),
+    %% idempotent update assert
+    ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
+    ?assertEqual(ExpRawConf, emqx:get_raw_config([])).
+
+t_cluster_hocon_export_import(Config) ->
+    RawConfBeforeImport = emqx:get_raw_config([]),
+    BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP),
+    Exp = {ok, #{db_errors => #{}, config_errors => #{}}},
+    ?assertEqual(Exp, emqx_mgmt_data_backup:import(BootstrapFile)),
+    RawConfAfterImport = emqx:get_raw_config([]),
+    ?assertNotEqual(RawConfBeforeImport, RawConfAfterImport),
+    {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
+    ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
+    ?assertEqual(RawConfAfterImport, emqx:get_raw_config([])),
+    %% idempotent update assert
+    ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
+    ?assertEqual(RawConfAfterImport, emqx:get_raw_config([])),
+    %% lookup file inside <data_dir>/backup
+    ?assertEqual(Exp, emqx_mgmt_data_backup:import(filename:basename(FileName))).
+
+t_ee_to_ce_backup(Config) ->
+    case emqx_release:edition() of
+        ce ->
+            EEBackupFileName = filename:join(?config(priv_dir, Config), "export-backup-ee.tar.gz"),
+            Meta = unicode:characters_to_binary(
+                hocon_pp:do(#{edition => ee, version => emqx_release:version()}, #{})
+            ),
+            ok = erl_tar:create(
+                EEBackupFileName,
+                [
+                    {"export-backup-ee/cluster.hocon", <<>>},
+                    {"export-backup-ee/META.hocon", Meta}
+                ],
+                [compressed]
+            ),
+            ExpReason = ee_to_ce_backup,
+            ?assertEqual(
+                {error, ExpReason}, emqx_mgmt_data_backup:import(EEBackupFileName)
+            ),
+            %% Must be translated to a readable string
+            ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpReason));
+        ee ->
+            %% Don't fail if the test is run with emqx-enterprise profile
+            ok
+    end.
+
+t_no_backup_file(_Config) ->
+    ExpReason = not_found,
+    ?assertEqual(
+        {error, not_found}, emqx_mgmt_data_backup:import("no_such_backup.tar.gz")
+    ),
+    ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpReason)).
+
+t_bad_backup_file(Config) ->
+    BadFileName = filename:join(?config(priv_dir, Config), "export-bad-backup-tar-gz"),
+    ok = file:write_file(BadFileName, <<>>),
+    NoMetaFileName = filename:join(?config(priv_dir, Config), "export-no-meta.tar.gz"),
+    ok = erl_tar:create(NoMetaFileName, [{"export-no-meta/cluster.hocon", <<>>}], [compressed]),
+    BadArchiveDirFileName = filename:join(?config(priv_dir, Config), "export-bad-dir.tar.gz"),
+    ok = erl_tar:create(
+        BadArchiveDirFileName,
+        [
+            {"tmp/cluster.hocon", <<>>},
+            {"export-bad-dir-inside/META.hocon", <<>>},
+            {"/export-bad-dir-inside/mnesia/test_tab", <<>>}
+        ],
+        [compressed]
+    ),
+    InvalidEditionFileName = filename:join(
+        ?config(priv_dir, Config), "export-invalid-edition.tar.gz"
+    ),
+    Meta = unicode:characters_to_binary(
+        hocon_pp:do(#{edition => "test", version => emqx_release:version()}, #{})
+    ),
+    ok = erl_tar:create(
+        InvalidEditionFileName,
+        [
+            {"export-invalid-edition/cluster.hocon", <<>>},
+            {"export-invalid-edition/META.hocon", Meta}
+        ],
+        [compressed]
+    ),
+    InvalidVersionFileName = filename:join(
+        ?config(priv_dir, Config), "export-invalid-version.tar.gz"
+    ),
+    Meta1 = unicode:characters_to_binary(
+        hocon_pp:do(#{edition => emqx_release:edition(), version => "test"}, #{})
+    ),
+    ok = erl_tar:create(
+        InvalidVersionFileName,
+        [
+            {"export-invalid-version/cluster.hocon", <<>>},
+            {"export-invalid-version/META.hocon", Meta1}
+        ],
+        [compressed]
+    ),
+    BadFileNameReason = bad_backup_name,
+    NoMetaReason = missing_backup_meta,
+    BadArchiveDirReason = bad_archive_dir,
+    InvalidEditionReason = invalid_edition,
+    InvalidVersionReason = invalid_version,
+    ?assertEqual({error, BadFileNameReason}, emqx_mgmt_data_backup:import(BadFileName)),
+    ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(BadFileNameReason)),
+    ?assertEqual({error, NoMetaReason}, emqx_mgmt_data_backup:import(NoMetaFileName)),
+    ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(NoMetaReason)),
+    ?assertEqual(
+        {error, BadArchiveDirReason},
+        emqx_mgmt_data_backup:import(BadArchiveDirFileName)
+    ),
+    ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(BadArchiveDirReason)),
+    ?assertEqual(
+        {error, InvalidEditionReason},
+        emqx_mgmt_data_backup:import(InvalidEditionFileName)
+    ),
+    ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(InvalidEditionReason)),
+    ?assertEqual(
+        {error, InvalidVersionReason},
+        emqx_mgmt_data_backup:import(InvalidVersionFileName)
+    ),
+    ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(InvalidVersionReason)).
+
+t_future_version(Config) ->
+    CurrentVersion = list_to_binary(emqx_release:version()),
+    [_, _ | Patch] = string:split(CurrentVersion, ".", all),
+    {ok, {MajorInt, MinorInt}} = emqx_mgmt_data_backup:parse_version_no_patch(CurrentVersion),
+    FutureMajorVersion = recompose_version(MajorInt + 1, MinorInt, Patch),
+    FutureMinorVersion = recompose_version(MajorInt, MinorInt + 1, Patch),
+    [MajorMeta, MinorMeta] =
+        [
+            unicode:characters_to_binary(
+                hocon_pp:do(#{edition => emqx_release:edition(), version => V}, #{})
+            )
+         || V <- [FutureMajorVersion, FutureMinorVersion]
+        ],
+    MajorFileName = filename:join(?config(priv_dir, Config), "export-future-major-ver.tar.gz"),
+    MinorFileName = filename:join(?config(priv_dir, Config), "export-future-minor-ver.tar.gz"),
+    ok = erl_tar:create(
+        MajorFileName,
+        [
+            {"export-future-major-ver/cluster.hocon", <<>>},
+            {"export-future-major-ver/META.hocon", MajorMeta}
+        ],
+        [compressed]
+    ),
+    ok = erl_tar:create(
+        MinorFileName,
+        [
+            {"export-future-minor-ver/cluster.hocon", <<>>},
+            {"export-future-minor-ver/META.hocon", MinorMeta}
+        ],
+        [compressed]
+    ),
+    ExpMajorReason = {unsupported_version, FutureMajorVersion},
+    ExpMinorReason = {unsupported_version, FutureMinorVersion},
+    ?assertEqual({error, ExpMajorReason}, emqx_mgmt_data_backup:import(MajorFileName)),
+    ?assertEqual({error, ExpMinorReason}, emqx_mgmt_data_backup:import(MinorFileName)),
+    ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpMajorReason)),
+    ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpMinorReason)).
+
+t_bad_config(Config) ->
+    BadConfigFileName = filename:join(?config(priv_dir, Config), "export-bad-config-backup.tar.gz"),
+    Meta = unicode:characters_to_binary(
+        hocon_pp:do(#{edition => emqx_release:edition(), version => emqx_release:version()}, #{})
+    ),
+    BadConfigMap = #{
+        <<"listeners">> =>
+            #{
+                <<"bad-type">> =>
+                    #{<<"bad-name">> => #{<<"bad-field">> => <<"bad-val">>}}
+            }
+    },
+    BadConfig = unicode:characters_to_binary(hocon_pp:do(BadConfigMap, #{})),
+    ok = erl_tar:create(
+        BadConfigFileName,
+        [
+            {"export-bad-config-backup/cluster.hocon", BadConfig},
+            {"export-bad-config-backup/META.hocon", Meta}
+        ],
+        [compressed]
+    ),
+    Res = emqx_mgmt_data_backup:import(BadConfigFileName),
+    ?assertMatch({error, #{kind := validation_error}}, Res).
+
+t_import_on_cluster(Config) ->
+    %% Randomly chosen config key to verify import result additionally
+    ?assertEqual([], emqx:get_config([authentication])),
+    BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP),
+    ExpImportRes = {ok, #{db_errors => #{}, config_errors => #{}}},
+    ?assertEqual(ExpImportRes, emqx_mgmt_data_backup:import(BootstrapFile)),
+    ImportedAuthnConf = emqx:get_config([authentication]),
+    ?assertMatch([_ | _], ImportedAuthnConf),
+    {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
+    {ok, Cwd} = file:get_cwd(),
+    AbsFilePath = filename:join(Cwd, FileName),
+    [CoreNode1, _CoreNode2, ReplicantNode] = NodesList = ?config(cluster, Config),
+    ReplImportReason = not_core_node,
+    ?assertEqual(
+        {error, ReplImportReason},
+        rpc:call(ReplicantNode, emqx_mgmt_data_backup, import, [AbsFilePath])
+    ),
+    ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ReplImportReason)),
+    [?assertEqual([], rpc:call(N, emqx, get_config, [[authentication]])) || N <- NodesList],
+    ?assertEqual(
+        ExpImportRes,
+        rpc:call(CoreNode1, emqx_mgmt_data_backup, import, [AbsFilePath])
+    ),
+    [
+        ?assertEqual(
+            authn_ids(ImportedAuthnConf),
+            authn_ids(rpc:call(N, emqx, get_config, [[authentication]]))
+        )
+     || N <- NodesList
+    ].
+
+t_verify_imported_mnesia_tab_on_cluster(Config) ->
+    UsersToExport = users(<<"user_to_export_">>),
+    UsersBeforeImport = users(<<"user_before_import_">>),
+    [{ok, _} = emqx_dashboard_admin:add_user(U, U, U) || U <- UsersToExport],
+    {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
+    {ok, Cwd} = file:get_cwd(),
+    AbsFilePath = filename:join(Cwd, FileName),
+
+    [CoreNode1, CoreNode2, ReplicantNode] = NodesList = ?config(cluster, Config),
+
+    [
+        {ok, _} = rpc:call(CoreNode1, emqx_dashboard_admin, add_user, [U, U, U])
+     || U <- UsersBeforeImport
+    ],
+
+    ?assertEqual(
+        {ok, #{db_errors => #{}, config_errors => #{}}},
+        rpc:call(CoreNode1, emqx_mgmt_data_backup, import, [AbsFilePath])
+    ),
+
+    [Tab] = emqx_dashboard_admin:backup_tables(),
+    AllUsers = lists:sort(mnesia:dirty_all_keys(Tab) ++ UsersBeforeImport),
+    [
+        ?assertEqual(
+            AllUsers,
+            lists:sort(rpc:call(N, mnesia, dirty_all_keys, [Tab]))
+        )
+     || N <- [CoreNode1, CoreNode2]
+    ],
+
+    %% Give some extra time to replicant to import data...
+    timer:sleep(3000),
+    ?assertEqual(AllUsers, lists:sort(rpc:call(ReplicantNode, mnesia, dirty_all_keys, [Tab]))),
+
+    [rpc:call(N, ekka, leave, []) || N <- lists:reverse(NodesList)],
+    [emqx_common_test_helpers:stop_slave(N) || N <- NodesList].
+
+t_mnesia_bad_tab_schema(_Config) ->
+    OldAttributes = [id, name, description],
+    ok = create_test_tab(OldAttributes),
+    ok = mria:dirty_write({data_backup_test, <<"id">>, <<"old_name">>, <<"old_description">>}),
+    {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
+    {atomic, ok} = mnesia:delete_table(data_backup_test),
+    NewAttributes = [id, name, description, new_field],
+    ok = create_test_tab(NewAttributes),
+    NewRec =
+        {data_backup_test, <<"id">>, <<"new_name">>, <<"new_description">>, <<"new_field_value">>},
+    ok = mria:dirty_write(NewRec),
+    ?assertEqual(
+        {ok, #{
+            db_errors =>
+                #{data_backup_test => {error, {"Backup traversal failed", different_table_schema}}},
+            config_errors => #{}
+        }},
+        emqx_mgmt_data_backup:import(FileName)
+    ),
+    ?assertEqual([NewRec], mnesia:dirty_read(data_backup_test, <<"id">>)),
+    ?assertEqual([<<"id">>], mnesia:dirty_all_keys(data_backup_test)).
+
+t_read_files(_Config) ->
+    DataDir = emqx:data_dir(),
+    %% Relative "data" path is set in init_per_testcase/2, asserting it must be safe
+    ?assertEqual("data", DataDir),
+    {ok, Cwd} = file:get_cwd(),
+    AbsDataDir = filename:join(Cwd, DataDir),
+    FileBaseName = "t_read_files_tmp_file",
+    TestFileAbsPath = iolist_to_binary(filename:join(AbsDataDir, FileBaseName)),
+    TestFilePath = iolist_to_binary(filename:join(DataDir, FileBaseName)),
+    TestFileContent = <<"test_file_content">>,
+    ok = file:write_file(TestFileAbsPath, TestFileContent),
+
+    RawConf = #{
+        <<"test_rootkey">> => #{
+            <<"test_field">> => <<"test_field_path">>,
+            <<"abs_data_dir_path_file">> => TestFileAbsPath,
+            <<"rel_data_dir_path_file">> => TestFilePath,
+            <<"path_outside_data_dir">> => <<"/tmp/some-file">>
+        }
+    },
+
+    RawConf1 = emqx_utils_maps:deep_put(
+        [<<"test_rootkey">>, <<"abs_data_dir_path_file">>], RawConf, TestFileContent
+    ),
+    ExpectedConf = emqx_utils_maps:deep_put(
+        [<<"test_rootkey">>, <<"rel_data_dir_path_file">>], RawConf1, TestFileContent
+    ),
+    ?assertEqual(ExpectedConf, emqx_mgmt_data_backup:read_data_files(RawConf)).
+
+%%------------------------------------------------------------------------------
+%% Internal test helpers
+%%------------------------------------------------------------------------------
+
+setup(Config) ->
+    %% avoid port conflicts if the cluster is started
+    AppHandler = fun
+        (emqx_dashboard) ->
+            ok = emqx_config:put([dashboard, listeners, http, bind], 0);
+        (_) ->
+            ok
+    end,
+    ok = emqx_common_test_helpers:start_apps(apps_to_start(), AppHandler),
+    PrevDataDir = application:get_env(emqx, data_dir),
+    application:set_env(emqx, data_dir, "data"),
+    [{previous_emqx_data_dir, PrevDataDir} | Config].
+
+cleanup(Config) ->
+    emqx_common_test_helpers:stop_apps(apps_to_start()),
+    case ?config(previous_emqx_data_dir, Config) of
+        undefined ->
+            application:unset_env(emqx, data_dir);
+        {ok, Val} ->
+            application:set_env(emqx, data_dir, Val)
+    end.
+
+cleanup_cluster(ClusterNodes) ->
+    [rpc:call(N, ekka, leave, []) || N <- lists:reverse(ClusterNodes)],
+    [emqx_common_test_helpers:stop_slave(N) || N <- ClusterNodes].
+
+users(Prefix) ->
+    [
+        <<Prefix/binary, (integer_to_binary(abs(erlang:unique_integer())))/binary>>
+     || _ <- lists:seq(1, 10)
+    ].
+
+authn_ids(AuthnConf) ->
+    lists:sort([emqx_authentication:authenticator_id(Conf) || Conf <- AuthnConf]).
+
+recompose_version(MajorInt, MinorInt, Patch) ->
+    unicode:characters_to_list(
+        [integer_to_list(MajorInt + 1), $., integer_to_list(MinorInt), $. | Patch]
+    ).
+
+cluster(Config) ->
+    PrivDataDir = ?config(priv_dir, Config),
+    [{Core1, Core1Opts}, {Core2, Core2Opts}, {Replicant, ReplOpts}] =
+        emqx_common_test_helpers:emqx_cluster(
+            [
+                {core, data_backup_core1},
+                {core, data_backup_core2},
+                {replicant, data_backup_replicant}
+            ],
+            #{
+                priv_data_dir => PrivDataDir,
+                schema_mod => emqx_conf_schema,
+                apps => apps_to_start(),
+                load_apps => apps_to_start() ++ apps_to_load(),
+                env => [{mria, db_backend, rlog}],
+                load_schema => true,
+                start_autocluster => true,
+                join_to => true,
+                listener_ports => [],
+                conf => [{[dashboard, listeners, http, bind], 0}],
+                env_handler =>
+                    fun(_) ->
+                        application:set_env(emqx, boot_modules, [broker, router])
+                    end
+            }
+        ),
+    Node1 = emqx_common_test_helpers:start_slave(Core1, Core1Opts),
+    Node2 = emqx_common_test_helpers:start_slave(Core2, Core2Opts),
+    #{conf := _ReplConf, env := ReplEnv} = ReplOpts,
+    ClusterDiscovery = {static, [{seeds, [Node1, Node2]}]},
+    ReplOpts1 = maps:remove(
+        join_to,
+        ReplOpts#{
+            env => [{ekka, cluster_discovery, ClusterDiscovery} | ReplEnv],
+            env_handler => fun(_) ->
+                application:set_env(emqx, boot_modules, [broker, router]),
+                application:set_env(
+                    ekka,
+                    cluster_discovery,
+                    ClusterDiscovery
+                )
+            end
+        }
+    ),
+    ReplNode = emqx_common_test_helpers:start_slave(Replicant, ReplOpts1),
+    [Node1, Node2, ReplNode].
+
+create_test_tab(Attributes) ->
+    ok = mria:create_table(data_backup_test, [
+        {type, set},
+        {rlog_shard, data_backup_test_shard},
+        {storage, disc_copies},
+        {record_name, data_backup_test},
+        {attributes, Attributes},
+        {storage_properties, [
+            {ets, [
+                {read_concurrency, true},
+                {write_concurrency, true}
+            ]}
+        ]}
+    ]),
+    ok = mria:wait_for_tables([data_backup_test]).
+
+apps_to_start() ->
+    [
+        emqx,
+        emqx_conf,
+        emqx_psk,
+        emqx_management,
+        emqx_dashboard,
+        emqx_authz,
+        emqx_authn,
+        emqx_rule_engine,
+        emqx_retainer,
+        emqx_prometheus,
+        emqx_modules,
+        emqx_gateway,
+        emqx_exhook,
+        emqx_bridge,
+        emqx_auto_subscribe
+    ].
+
+apps_to_load() ->
+    [
+        emqx_gateway_lwm2m,
+        emqx_gateway_coap,
+        emqx_gateway_exproto,
+        emqx_gateway_stomp,
+        emqx_gateway_mqttsn
+    ].

BIN
apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-test-bootstrap-ce.tar.gz


+ 12 - 12
apps/emqx_modules/src/emqx_delayed.erl

@@ -98,9 +98,9 @@
 -define(FORMAT_FUN, {?MODULE, format_delayed}).
 -define(NOW, erlang:system_time(milli_seconds)).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Mnesia bootstrap
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 mnesia(boot) ->
     ok = mria:create_table(?TAB, [
         {type, ordered_set},
@@ -110,9 +110,9 @@ mnesia(boot) ->
         {attributes, record_info(fields, delayed_message)}
     ]).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Hooks
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 on_message_publish(
     Msg = #message{
         id = Id,
@@ -143,9 +143,9 @@ on_message_publish(
 on_message_publish(Msg) ->
     {ok, Msg}.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Start delayed publish server
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 -spec start_link() -> emqx_types:startlink_ret().
 start_link() ->
@@ -270,9 +270,9 @@ post_config_update(_KeyPath, _ConfigReq, NewConf, _OldConf, _AppEnvs) ->
     Enable = maps:get(enable, NewConf, undefined),
     load_or_unload(Enable).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% gen_server callback
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 init([]) ->
     ok = mria:wait_for_tables([?TAB]),
@@ -335,9 +335,9 @@ terminate(_Reason, #{stats_timer := StatsTimer} = State) ->
 code_change(_Vsn, State, _Extra) ->
     {ok, State}.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Telemetry
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 -spec get_basic_usage_info() -> #{delayed_message_count => non_neg_integer()}.
 get_basic_usage_info() ->
@@ -348,9 +348,9 @@ get_basic_usage_info() ->
         end,
     #{delayed_message_count => DelayedCount}.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Internal functions
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 %% Ensure the stats
 -spec ensure_stats_event(state()) -> state().

+ 11 - 7
apps/emqx_modules/src/emqx_rewrite.erl

@@ -49,9 +49,12 @@
 %% exported for `emqx_telemetry'
 -export([get_basic_usage_info/0]).
 
-%%--------------------------------------------------------------------
+-define(update(_Rules_),
+    emqx_conf:update([rewrite], _Rules_, #{override_to => cluster})
+).
+%%------------------------------------------------------------------------------
 %% Load/Unload
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 enable() ->
     emqx_conf:add_handler([rewrite], ?MODULE),
@@ -67,7 +70,7 @@ list() ->
     emqx_conf:get_raw([<<"rewrite">>], []).
 
 update(Rules0) ->
-    case emqx_conf:update([rewrite], Rules0, #{override_to => cluster}) of
+    case ?update(Rules0) of
         {ok, _} ->
             ok;
         {error, Reason} ->
@@ -109,18 +112,19 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) ->
     Binds = fill_client_binds(Message),
     {ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Telemetry
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 -spec get_basic_usage_info() -> #{topic_rewrite_rule_count => non_neg_integer()}.
 get_basic_usage_info() ->
     RewriteRules = list(),
     #{topic_rewrite_rule_count => length(RewriteRules)}.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Internal functions
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+
 compile(Rules) ->
     lists:foldl(
         fun(Rule, {Publish, Subscribe, Error}) ->

+ 1 - 1
apps/emqx_psk/src/emqx_psk.app.src

@@ -2,7 +2,7 @@
 {application, emqx_psk, [
     {description, "EMQX PSK"},
     % strict semver, bump manually!
-    {vsn, "5.0.1"},
+    {vsn, "5.0.2"},
     {modules, []},
     {registered, [emqx_psk_sup]},
     {applications, [kernel, stdlib]},

+ 34 - 2
apps/emqx_psk/src/emqx_psk.erl

@@ -17,6 +17,8 @@
 -module(emqx_psk).
 
 -behaviour(gen_server).
+-behaviour(emqx_db_backup).
+-behaviour(emqx_config_backup).
 
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
@@ -48,6 +50,12 @@
     insert_psks/1
 ]).
 
+%% Data backup
+-export([
+    import_config/1,
+    backup_tables/0
+]).
+
 -record(psk_entry, {
     psk_id :: binary(),
     shared_secret :: binary(),
@@ -86,6 +94,12 @@ mnesia(boot) ->
         {storage_properties, [{ets, [{read_concurrency, true}]}]}
     ]).
 
+%%------------------------------------------------------------------------------
+%% Data backup
+%%------------------------------------------------------------------------------
+
+backup_tables() -> [?TAB].
+
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
@@ -115,9 +129,27 @@ start_link() ->
 stop() ->
     gen_server:stop(?MODULE).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+%% Data backup
+%%------------------------------------------------------------------------------
+
+import_config(#{<<"psk_authentication">> := PskConf}) ->
+    case emqx_conf:update([psk_authentication], PskConf, #{override_to => cluster}) of
+        {ok, _} ->
+            case get_config(enable) of
+                true -> load();
+                false -> ok
+            end,
+            {ok, #{root_key => psk_authentication, changed => []}};
+        Error ->
+            {error, #{root_key => psk_authentication, reason => Error}}
+    end;
+import_config(_RawConf) ->
+    {ok, #{root_key => psk_authentication, changed => []}}.
+
+%%------------------------------------------------------------------------------
 %% gen_server callbacks
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 init(_Opts) ->
     _ =

+ 10 - 10
apps/emqx_retainer/src/emqx_retainer.erl

@@ -82,9 +82,9 @@
 -callback clean(context()) -> ok.
 -callback size(context()) -> non_neg_integer().
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Hook API
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 -spec on_session_subscribed(_, _, emqx_types:subopts(), _) -> any().
 on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefined ->
     ok;
@@ -118,9 +118,9 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
 on_message_publish(Msg, _) ->
     {ok, Msg}.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% APIs
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 %% @doc Start the retainer
 -spec start_link() -> emqx_types:startlink_ret().
@@ -169,9 +169,9 @@ call(Req) ->
 stats_fun() ->
     gen_server:cast(?MODULE, ?FUNCTION_NAME).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% APIs
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 -spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}.
 get_basic_usage_info() ->
@@ -183,9 +183,9 @@ get_basic_usage_info() ->
             #{retained_messages => 0}
     end.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% gen_server callbacks
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 init([]) ->
     erlang:process_flag(trap_exit, true),
@@ -248,9 +248,9 @@ terminate(_Reason, #{clear_timer := ClearTimer}) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Internal functions
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 -spec new_state() -> state().
 new_state() ->
     #{

+ 35 - 11
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -18,6 +18,7 @@
 
 -behaviour(gen_server).
 -behaviour(emqx_config_handler).
+-behaiour(emqx_config_backup).
 
 -include("rule_engine.hrl").
 -include_lib("emqx/include/logger.hrl").
@@ -78,6 +79,11 @@
     code_change/3
 ]).
 
+%% Data backup
+-export([
+    import_config/1
+]).
+
 -define(RULE_ENGINE, ?MODULE).
 
 -define(T_CALL, infinity).
@@ -105,7 +111,7 @@
 start_link() ->
     gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []).
 
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 %% The config handler for emqx_rule_engine
 %%------------------------------------------------------------------------------
 post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) ->
@@ -142,9 +148,9 @@ post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRule
             {error, Error}
     end.
 
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 %% APIs for rules
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 
 -spec load_rules() -> ok.
 load_rules() ->
@@ -185,9 +191,9 @@ delete_rule(RuleId) when is_binary(RuleId) ->
 insert_rule(Rule) ->
     gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL).
 
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 %% Rule Management
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 
 -spec get_rules() -> [rule()].
 get_rules() ->
@@ -301,9 +307,9 @@ unload_hooks_for_rule(#{id := Id, from := Topics}) ->
         Topics
     ).
 
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 %% Telemetry helper functions
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 
 -spec get_basic_usage_info() ->
     #{
@@ -362,9 +368,27 @@ tally_referenced_bridges(BridgeIDs, Acc0) ->
         BridgeIDs
     ).
 
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
+%% Data backup
+%%----------------------------------------------------------------------------------------
+
+import_config(#{<<"rule_engine">> := #{<<"rules">> := NewRules} = RuleEngineConf}) ->
+    OldRules = emqx:get_raw_config(?KEY_PATH, #{}),
+    RuleEngineConf1 = RuleEngineConf#{<<"rules">> => maps:merge(OldRules, NewRules)},
+    case emqx_conf:update([rule_engine], RuleEngineConf1, #{override_to => cluster}) of
+        {ok, #{raw_config := #{<<"rules">> := NewRawRules}}} ->
+            Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawRules, OldRules)),
+            ChangedPaths = [?RULE_PATH(Id) || Id <- maps:keys(Changed)],
+            {ok, #{root_key => rule_engine, changed => ChangedPaths}};
+        Error ->
+            {error, #{root_key => rule_engine, reason => Error}}
+    end;
+import_config(_RawConf) ->
+    {ok, #{root_key => rule_engine, changed => []}}.
+
+%%----------------------------------------------------------------------------------------
 %% gen_server callbacks
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 
 init([]) ->
     _TableId = ets:new(?KV_TAB, [
@@ -404,9 +428,9 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 %% Internal Functions
-%%------------------------------------------------------------------------------
+%%----------------------------------------------------------------------------------------
 
 parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) ->
     case emqx_rule_sqlparser:parse(Sql) of

+ 38 - 1
apps/emqx_utils/src/emqx_utils.erl

@@ -56,7 +56,8 @@
     safe_to_existing_atom/2,
     pub_props_to_packet/1,
     safe_filename/1,
-    diff_lists/3
+    diff_lists/3,
+    merge_lists/3
 ]).
 
 -export([
@@ -819,6 +820,42 @@ diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) ->
         changed => lists:reverse(Changed)
     }.
 
+%% @doc Merges two lists preserving the original order of elements in both lists.
+%% KeyFunc must extract a unique key from each element.
+%% If two keys exist in both lists, the value in List1 is superseded by the value in List2, but
+%% the element position in the result list will equal its position in List1.
+%% Example:
+%%     emqx_utils:merge_append_lists(
+%%         [#{id => a, val => old}, #{id => b, val => old}],
+%%         [#{id => a, val => new}, #{id => c}, #{id => b, val => new}, #{id => d}],
+%%         fun(#{id := Id}) -> Id end).
+%%    [#{id => a,val => new},
+%%     #{id => b,val => new},
+%%     #{id => c},
+%%     #{id => d}]
+-spec merge_lists(list(T), list(T), KeyFunc) -> list(T) when
+    KeyFunc :: fun((T) -> any()),
+    T :: any().
+merge_lists(List1, List2, KeyFunc) ->
+    WithKeysList2 = lists:map(fun(E) -> {KeyFunc(E), E} end, List2),
+    WithKeysList1 = lists:map(
+        fun(E) ->
+            K = KeyFunc(E),
+            case lists:keyfind(K, 1, WithKeysList2) of
+                false -> {K, E};
+                WithKey1 -> WithKey1
+            end
+        end,
+        List1
+    ),
+    NewWithKeysList2 = lists:filter(
+        fun({K, _}) ->
+            not lists:keymember(K, 1, WithKeysList1)
+        end,
+        WithKeysList2
+    ),
+    [E || {_, E} <- WithKeysList1 ++ NewWithKeysList2].
+
 search(_ExpectValue, _KeyFunc, []) ->
     false;
 search(ExpectValue, KeyFunc, [Item | List]) ->

+ 4 - 0
changes/ce/feat-10676.en.md

@@ -0,0 +1,4 @@
+Implement configuration and user data import/export CLI.
+
+The `emqx ctl export` and `emqx ctl import` commands allow to export configuration and built-in database
+data from a running EMQX cluster and later import it to the same or another running EMQX cluster.

+ 50 - 3
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl

@@ -5,6 +5,7 @@
 
 -behaviour(gen_server).
 -behaviour(emqx_config_handler).
+-behaviour(emqx_config_backup).
 
 -include("emqx_ee_schema_registry.hrl").
 -include_lib("emqx/include/logger.hrl").
@@ -13,9 +14,7 @@
 %% API
 -export([
     start_link/0,
-
     get_serde/1,
-
     add_schema/2,
     get_schema/1,
     delete_schema/1,
@@ -34,6 +33,11 @@
 %% `emqx_config_handler' API
 -export([post_config_update/5]).
 
+%% Data backup
+-export([
+    import_config/1
+]).
+
 -type schema() :: #{
     type := serde_type(),
     source := binary(),
@@ -129,7 +133,50 @@ post_config_update(
         {error, Reason, SerdesToRollback} ->
             lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
             {error, Reason}
-    end.
+    end;
+post_config_update(?CONF_KEY_PATH, _Cmd, NewConf = #{schemas := NewSchemas}, OldConf, _AppEnvs) ->
+    OldSchemas = maps:get(schemas, OldConf, #{}),
+    #{
+        added := Added,
+        changed := Changed0,
+        removed := Removed
+    } = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas),
+    Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0),
+    RemovedNames = maps:keys(Removed),
+    case RemovedNames of
+        [] ->
+            ok;
+        _ ->
+            async_delete_serdes(RemovedNames)
+    end,
+    SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
+    case build_serdes(SchemasToBuild) of
+        ok ->
+            {ok, NewConf};
+        {error, Reason, SerdesToRollback} ->
+            lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
+            {error, Reason}
+    end;
+post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
+    {ok, NewConf}.
+
+%%-------------------------------------------------------------------------------------------------
+%% Data backup
+%%-------------------------------------------------------------------------------------------------
+
+import_config(#{<<"schema_registry">> := #{<<"schemas">> := Schemas} = SchemaRegConf}) ->
+    OldSchemas = emqx:get_raw_config([?CONF_KEY_ROOT, schemas], #{}),
+    SchemaRegConf1 = SchemaRegConf#{<<"schemas">> => maps:merge(OldSchemas, Schemas)},
+    case emqx_conf:update(?CONF_KEY_PATH, SchemaRegConf1, #{override_to => cluster}) of
+        {ok, #{raw_config := #{<<"schemas">> := NewRawSchemas}}} ->
+            Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawSchemas, OldSchemas)),
+            ChangedPaths = [[?CONF_KEY_ROOT, schemas, Name] || Name <- maps:keys(Changed)],
+            {ok, #{root_key => ?CONF_KEY_ROOT, changed => ChangedPaths}};
+        Error ->
+            {error, #{root_key => ?CONF_KEY_ROOT, reason => Error}}
+    end;
+import_config(_RawConf) ->
+    {ok, #{root_key => ?CONF_KEY_ROOT, changed => []}}.
 
 %%-------------------------------------------------------------------------------------------------
 %% `gen_server' API

+ 4 - 0
lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl

@@ -11,9 +11,13 @@
 
 start(_StartType, _StartArgs) ->
     ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
+    %% HTTP API handler
     emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry),
+    %% Conf load / data import handler
+    emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry),
     emqx_ee_schema_registry_sup:start_link().
 
 stop(_State) ->
     emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']),
+    emqx_conf:remove_handler(?CONF_KEY_PATH),
     ok.

+ 31 - 0
lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl

@@ -700,3 +700,34 @@ t_cluster_serde_build(Config) ->
         ]
     ),
     ok.
+
+t_import_config(_Config) ->
+    RawConf = #{
+        <<"schema_registry">> =>
+            #{
+                <<"schemas">> =>
+                    #{
+                        <<"my_avro_schema">> =>
+                            #{
+                                <<"description">> => <<"My Avro Schema">>,
+                                <<"source">> =>
+                                    <<"{\"type\":\"record\",\"fields\":[{\"type\":\"int\",\"name\":\"i\"},{\"type\":\"string\",\"name\":\"s\"}]}">>,
+                                <<"type">> => <<"avro">>
+                            }
+                    }
+            }
+    },
+    RawConf1 = emqx_utils_maps:deep_put(
+        [<<"schema_registry">>, <<"schemas">>, <<"my_avro_schema">>, <<"description">>],
+        RawConf,
+        <<"Updated description">>
+    ),
+    Path = [schema_registry, schemas, <<"my_avro_schema">>],
+    ?assertEqual(
+        {ok, #{root_key => schema_registry, changed => []}},
+        emqx_ee_schema_registry:import_config(RawConf)
+    ),
+    ?assertEqual(
+        {ok, #{root_key => schema_registry, changed => [Path]}},
+        emqx_ee_schema_registry:import_config(RawConf1)
+    ).