Jelajahi Sumber

Merge pull request #12017 from SergeTupchiy/EMQX-11319-data-backup-http-api

EMQX-11319 data backup http api
SergeTupchiy 2 tahun lalu
induk
melakukan
3b1ae0f3df

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -39,6 +39,7 @@
 {emqx_mgmt_api_plugins,2}.
 {emqx_mgmt_api_plugins,2}.
 {emqx_mgmt_cluster,1}.
 {emqx_mgmt_cluster,1}.
 {emqx_mgmt_cluster,2}.
 {emqx_mgmt_cluster,2}.
+{emqx_mgmt_data_backup,1}.
 {emqx_mgmt_trace,1}.
 {emqx_mgmt_trace,1}.
 {emqx_mgmt_trace,2}.
 {emqx_mgmt_trace,2}.
 {emqx_node_rebalance,1}.
 {emqx_node_rebalance,1}.

+ 361 - 0
apps/emqx_management/src/emqx_mgmt_api_data_backup.erl

@@ -0,0 +1,361 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_api_data_backup).
+
+-behaviour(minirest_api).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-export([api_spec/0, paths/0, schema/1, fields/1]).
+
+-export([
+    data_export/2,
+    data_import/2,
+    data_files/2,
+    data_file_by_name/2
+]).
+
+-define(TAGS, [<<"Data Backup">>]).
+
+-define(BAD_REQUEST, 'BAD_REQUEST').
+-define(NOT_FOUND, 'NOT_FOUND').
+
+-define(node_field(IsRequired), ?node_field(IsRequired, #{})).
+-define(node_field(IsRequired, Meta),
+    {node, ?HOCON(binary(), Meta#{desc => "Node name", required => IsRequired})}
+).
+-define(filename_field(IsRequired), ?filename_field(IsRequired, #{})).
+-define(filename_field(IsRequired, Meta),
+    {filename,
+        ?HOCON(binary(), Meta#{
+            desc => "Data backup file name",
+            required => IsRequired
+        })}
+).
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [
+        "/data/export",
+        "/data/import",
+        "/data/files",
+        "/data/files/:filename"
+    ].
+
+schema("/data/export") ->
+    #{
+        'operationId' => data_export,
+        post => #{
+            tags => ?TAGS,
+            desc => <<"Export a data backup file">>,
+            responses => #{
+                200 =>
+                    emqx_dashboard_swagger:schema_with_example(
+                        ?R_REF(backup_file_info),
+                        backup_file_info_example()
+                    )
+            }
+        }
+    };
+schema("/data/import") ->
+    #{
+        'operationId' => data_import,
+        post => #{
+            tags => ?TAGS,
+            desc => <<"Import a data backup file">>,
+            'requestBody' => emqx_dashboard_swagger:schema_with_example(
+                ?R_REF(import_request_body),
+                maps:with([node, filename], backup_file_info_example())
+            ),
+
+            responses => #{
+                204 => <<"No Content">>,
+                400 => emqx_dashboard_swagger:error_codes(
+                    [?BAD_REQUEST], <<"Backup file import failed">>
+                )
+            }
+        }
+    };
+schema("/data/files") ->
+    #{
+        'operationId' => data_files,
+        post => #{
+            tags => ?TAGS,
+            desc => <<"Upload a data backup file">>,
+            'requestBody' => emqx_dashboard_swagger:file_schema(filename),
+            responses => #{
+                204 => <<"No Content">>,
+                400 => emqx_dashboard_swagger:error_codes(
+                    [?BAD_REQUEST], <<"Bad backup file">>
+                )
+            }
+        },
+        get => #{
+            tags => ?TAGS,
+            desc => <<"List backup files">>,
+            parameters => [
+                ?R_REF(emqx_dashboard_swagger, page),
+                ?R_REF(emqx_dashboard_swagger, limit)
+            ],
+            responses => #{
+                200 =>
+                    emqx_dashboard_swagger:schema_with_example(
+                        ?R_REF(files_response),
+                        files_response_example()
+                    )
+            }
+        }
+    };
+schema("/data/files/:filename") ->
+    #{
+        'operationId' => data_file_by_name,
+        get => #{
+            tags => ?TAGS,
+            desc => <<"Download a data backup file">>,
+            parameters => [
+                ?filename_field(true, #{in => path}),
+                ?node_field(false, #{in => query})
+            ],
+            responses => #{
+                200 => ?HOCON(binary),
+                400 => emqx_dashboard_swagger:error_codes(
+                    [?BAD_REQUEST], <<"Bad request">>
+                ),
+                404 => emqx_dashboard_swagger:error_codes(
+                    [?NOT_FOUND], <<"Backup file not found">>
+                )
+            }
+        },
+        delete => #{
+            tags => ?TAGS,
+            desc => <<"Delete a data backup file">>,
+            parameters => [
+                ?filename_field(true, #{in => path}),
+                ?node_field(false, #{in => query})
+            ],
+            responses => #{
+                204 => <<"No Content">>,
+                400 => emqx_dashboard_swagger:error_codes(
+                    [?BAD_REQUEST], <<"Bad request">>
+                ),
+                404 => emqx_dashboard_swagger:error_codes(
+                    [?NOT_FOUND], <<"Backup file not found">>
+                )
+            }
+        }
+    }.
+
+fields(files_response) ->
+    [
+        {data, ?ARRAY(?R_REF(backup_file_info))},
+        {meta, ?R_REF(emqx_dashboard_swagger, meta)}
+    ];
+fields(backup_file_info) ->
+    [
+        ?node_field(true),
+        ?filename_field(true),
+        {created_at,
+            ?HOCON(binary(), #{
+                desc => "Data backup file creation date and time",
+                required => true
+            })}
+    ];
+fields(import_request_body) ->
+    [?node_field(false), ?filename_field(true)];
+fields(data_backup_file) ->
+    [
+        ?filename_field(true),
+        {file,
+            ?HOCON(binary(), #{
+                desc => "Data backup file content",
+                required => true
+            })}
+    ].
+
+%%------------------------------------------------------------------------------
+%% HTTP API Callbacks
+%%------------------------------------------------------------------------------
+
+data_export(post, _Request) ->
+    case emqx_mgmt_data_backup:export() of
+        {ok, #{filename := FileName} = File} ->
+            {200, File#{filename => filename:basename(FileName)}};
+        Error ->
+            Error
+    end.
+
+data_import(post, #{body := #{<<"filename">> := FileName} = Body}) ->
+    case safe_parse_node(Body) of
+        {error, Msg} ->
+            {400, #{code => 'BAD_REQUEST', message => Msg}};
+        FileNode ->
+            CoreNode = core_node(FileNode),
+            response(
+                emqx_mgmt_data_backup_proto_v1:import_file(CoreNode, FileNode, FileName, infinity)
+            )
+    end.
+
+core_node(FileNode) ->
+    case mria_rlog:role(FileNode) of
+        core ->
+            FileNode;
+        replicant ->
+            case mria_rlog:role() of
+                core ->
+                    node();
+                replicant ->
+                    mria_membership:coordinator()
+            end
+    end.
+
+data_files(post, #{body := #{<<"filename">> := #{type := _} = File}}) ->
+    [{FileName, FileContent} | _] = maps:to_list(maps:without([type], File)),
+    case emqx_mgmt_data_backup:upload(FileName, FileContent) of
+        ok ->
+            {204};
+        {error, Reason} ->
+            {400, #{code => 'BAD_REQUEST', message => emqx_mgmt_data_backup:format_error(Reason)}}
+    end;
+data_files(get, #{query_string := PageParams}) ->
+    case emqx_mgmt_api:parse_pager_params(PageParams) of
+        false ->
+            {400, #{code => ?BAD_REQUEST, message => <<"page_limit_invalid">>}};
+        #{page := Page, limit := Limit} = Pager ->
+            {200, #{data => list_backup_files(Page, Limit), meta => Pager}}
+    end.
+
+data_file_by_name(Method, #{bindings := #{filename := Filename}, query_string := QS}) ->
+    case safe_parse_node(QS) of
+        {error, Msg} ->
+            {400, #{code => 'BAD_REQUEST', message => Msg}};
+        Node ->
+            case get_or_delete_file(Method, Filename, Node) of
+                {error, not_found} ->
+                    {404, #{
+                        code => ?NOT_FOUND, message => emqx_mgmt_data_backup:format_error(not_found)
+                    }};
+                Other ->
+                    response(Other)
+            end
+    end.
+
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+get_or_delete_file(get, Filename, Node) ->
+    emqx_mgmt_data_backup_proto_v1:read_file(Node, Filename, infinity);
+get_or_delete_file(delete, Filename, Node) ->
+    emqx_mgmt_data_backup_proto_v1:delete_file(Node, Filename, infinity).
+
+safe_parse_node(#{<<"node">> := NodeBin}) ->
+    NodesBin = [erlang:atom_to_binary(N, utf8) || N <- emqx:running_nodes()],
+    case lists:member(NodeBin, NodesBin) of
+        true -> erlang:binary_to_atom(NodeBin, utf8);
+        false -> {error, io_lib:format("Unknown node: ~s", [NodeBin])}
+    end;
+safe_parse_node(_) ->
+    node().
+
+response({ok, #{db_errors := DbErrs, config_errors := ConfErrs}}) ->
+    case DbErrs =:= #{} andalso ConfErrs =:= #{} of
+        true ->
+            {204};
+        false ->
+            DbErrs1 = emqx_mgmt_data_backup:format_db_errors(DbErrs),
+            ConfErrs1 = emqx_mgmt_data_backup:format_conf_errors(ConfErrs),
+            Msg = unicode:characters_to_binary(io_lib:format("~s", [DbErrs1 ++ ConfErrs1])),
+            {400, #{code => ?BAD_REQUEST, message => Msg}}
+    end;
+response({ok, Res}) ->
+    {200, Res};
+response(ok) ->
+    {204};
+response({error, Reason}) ->
+    {400, #{code => ?BAD_REQUEST, message => emqx_mgmt_data_backup:format_error(Reason)}}.
+
+list_backup_files(Page, Limit) ->
+    Start = Page * Limit - Limit + 1,
+    lists:sublist(list_backup_files(), Start, Limit).
+
+list_backup_files() ->
+    Nodes = emqx:running_nodes(),
+    Results = emqx_mgmt_data_backup_proto_v1:list_files(Nodes, 30_0000),
+    NodeResults = lists:zip(Nodes, Results),
+    {Successes, Failures} =
+        lists:partition(
+            fun({_Node, Result}) ->
+                case Result of
+                    {ok, _} -> true;
+                    _ -> false
+                end
+            end,
+            NodeResults
+        ),
+    case Failures of
+        [] ->
+            ok;
+        [_ | _] ->
+            ?SLOG(error, #{msg => "list_exported_backup_files_failed", node_errors => Failures})
+    end,
+    FileList = [FileInfo || {_Node, {ok, FileInfos}} <- Successes, FileInfo <- FileInfos],
+    lists:sort(
+        fun(#{created_at_sec := T1, filename := F1}, #{created_at_sec := T2, filename := F2}) ->
+            case T1 =:= T2 of
+                true -> F1 >= F2;
+                false -> T1 > T2
+            end
+        end,
+        FileList
+    ).
+
+backup_file_info_example() ->
+    #{
+        created_at => <<"2023-11-23T19:13:19+02:00">>,
+        created_at_sec => 1700759599,
+        filename => <<"emqx-export-2023-11-23-19-13-19.043.tar.gz">>,
+        node => 'emqx@127.0.0.1',
+        size => 22740
+    }.
+
+files_response_example() ->
+    #{
+        data => [
+            #{
+                created_at => <<"2023-09-02T11:11:33+02:00">>,
+                created_at_sec => 1693645893,
+                filename => <<"emqx-export-2023-09-02-11-11-33.012.tar.gz">>,
+                node => 'emqx@127.0.0.1',
+                size => 22740
+            },
+            #{
+                created_at => <<"2023-11-23T19:13:19+02:00">>,
+                created_at_sec => 1700759599,
+                filename => <<"emqx-export-2023-11-23-19-13-19.043.tar.gz">>,
+                node => 'emqx@127.0.0.1',
+                size => 22740
+            }
+        ],
+        meta => #{
+            page => 0,
+            limit => 20,
+            count => 300
+        }
+    }.

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -775,7 +775,7 @@ data(["import", Filename]) ->
             emqx_ctl:print("Data has been imported successfully.~n");
             emqx_ctl:print("Data has been imported successfully.~n");
         {ok, _} ->
         {ok, _} ->
             emqx_ctl:print(
             emqx_ctl:print(
-                "Data has been imported, but some errors occurred, see the the log above.~n"
+                "Data has been imported, but some errors occurred, see the log above.~n"
             );
             );
         {error, Reason} ->
         {error, Reason} ->
             Reason1 = emqx_mgmt_data_backup:format_error(Reason),
             Reason1 = emqx_mgmt_data_backup:format_error(Reason),

+ 181 - 30
apps/emqx_management/src/emqx_mgmt_data_backup.erl

@@ -24,8 +24,21 @@
     format_error/1
     format_error/1
 ]).
 ]).
 
 
+%% HTTP API
+-export([
+    upload/2,
+    maybe_copy_and_import/2,
+    read_file/1,
+    delete_file/1,
+    list_files/0,
+    format_conf_errors/1,
+    format_db_errors/1
+]).
+
 -export([default_validate_mnesia_backup/1]).
 -export([default_validate_mnesia_backup/1]).
 
 
+-export_type([import_res/0]).
+
 -ifdef(TEST).
 -ifdef(TEST).
 -compile(export_all).
 -compile(export_all).
 -compile(nowarn_export_all).
 -compile(nowarn_export_all).
@@ -80,17 +93,21 @@
         end
         end
     end()
     end()
 ).
 ).
+-define(backup_path(_FileName_), filename:join(root_backup_dir(), _FileName_)).
 
 
 -type backup_file_info() :: #{
 -type backup_file_info() :: #{
-    filename => binary(),
-    size => non_neg_integer(),
-    created_at => binary(),
-    node => node(),
+    filename := binary(),
+    size := non_neg_integer(),
+    created_at := binary(),
+    created_at_sec := integer(),
+    node := node(),
     atom() => _
     atom() => _
 }.
 }.
 
 
 -type db_error_details() :: #{mria:table() => {error, _}}.
 -type db_error_details() :: #{mria:table() => {error, _}}.
 -type config_error_details() :: #{emqx_utils_maps:config_path() => {error, _}}.
 -type config_error_details() :: #{emqx_utils_maps:config_path() => {error, _}}.
+-type import_res() ::
+    {ok, #{db_errors => db_error_details(), config_errors => config_error_details()}} | {error, _}.
 
 
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% APIs
 %% APIs
@@ -120,15 +137,11 @@ export(Opts) ->
         file:del_dir_r(BackupName)
         file:del_dir_r(BackupName)
     end.
     end.
 
 
--spec import(file:filename_all()) ->
-    {ok, #{db_errors => db_error_details(), config_errors => config_error_details()}}
-    | {error, _}.
+-spec import(file:filename_all()) -> import_res().
 import(BackupFileName) ->
 import(BackupFileName) ->
     import(BackupFileName, ?DEFAULT_OPTS).
     import(BackupFileName, ?DEFAULT_OPTS).
 
 
--spec import(file:filename_all(), map()) ->
-    {ok, #{db_errors => db_error_details(), config_errors => config_error_details()}}
-    | {error, _}.
+-spec import(file:filename_all(), map()) -> import_res().
 import(BackupFileName, Opts) ->
 import(BackupFileName, Opts) ->
     case is_import_allowed() of
     case is_import_allowed() of
         true ->
         true ->
@@ -142,6 +155,74 @@ import(BackupFileName, Opts) ->
             {error, not_core_node}
             {error, not_core_node}
     end.
     end.
 
 
+-spec maybe_copy_and_import(node(), file:filename_all()) -> import_res().
+maybe_copy_and_import(FileNode, BackupFileName) when FileNode =:= node() ->
+    import(BackupFileName, #{});
+maybe_copy_and_import(FileNode, BackupFileName) ->
+    %% The file can be already present locally
+    case filelib:is_file(?backup_path(str(BackupFileName))) of
+        true ->
+            import(BackupFileName, #{});
+        false ->
+            copy_and_import(FileNode, BackupFileName)
+    end.
+
+-spec read_file(file:filename_all()) ->
+    {ok, #{filename => file:filename_all(), file => binary()}} | {error, _}.
+read_file(BackupFileName) ->
+    BackupFileNameStr = str(BackupFileName),
+    case validate_backup_name(BackupFileNameStr) of
+        ok ->
+            maybe_not_found(file:read_file(?backup_path(BackupFileName)));
+        Err ->
+            Err
+    end.
+
+-spec delete_file(file:filename_all()) -> ok | {error, _}.
+delete_file(BackupFileName) ->
+    BackupFileNameStr = str(BackupFileName),
+    case validate_backup_name(BackupFileNameStr) of
+        ok ->
+            maybe_not_found(file:delete(?backup_path(BackupFileName)));
+        Err ->
+            Err
+    end.
+
+-spec upload(file:filename_all(), binary()) -> ok | {error, _}.
+upload(BackupFileName, BackupFileContent) ->
+    BackupFileNameStr = str(BackupFileName),
+    FilePath = ?backup_path(BackupFileNameStr),
+    case filelib:is_file(FilePath) of
+        true ->
+            {error, {already_exists, BackupFileNameStr}};
+        false ->
+            do_upload(BackupFileNameStr, BackupFileContent)
+    end.
+
+-spec list_files() -> [backup_file_info()].
+list_files() ->
+    Filter =
+        fun(File) ->
+            case file:read_file_info(File, [{time, posix}]) of
+                {ok, #file_info{size = Size, ctime = CTimeSec}} ->
+                    BaseFilename = bin(filename:basename(File)),
+                    Info = #{
+                        filename => BaseFilename,
+                        size => Size,
+                        created_at => emqx_utils_calendar:epoch_to_rfc3339(CTimeSec, second),
+                        created_at_sec => CTimeSec,
+                        node => node()
+                    },
+                    {true, Info};
+                _ ->
+                    false
+            end
+        end,
+    lists:filtermap(Filter, backup_files()).
+
+backup_files() ->
+    filelib:wildcard(?backup_path("*" ++ ?TAR_SUFFIX)).
+
 format_error(not_core_node) ->
 format_error(not_core_node) ->
     str(
     str(
         io_lib:format(
         io_lib:format(
@@ -170,13 +251,83 @@ format_error({unsupported_version, ImportVersion}) ->
             [str(ImportVersion), str(emqx_release:version())]
             [str(ImportVersion), str(emqx_release:version())]
         )
         )
     );
     );
+format_error({already_exists, BackupFileName}) ->
+    str(io_lib:format("Backup file \"~s\" already exists", [BackupFileName]));
 format_error(Reason) ->
 format_error(Reason) ->
     Reason.
     Reason.
 
 
+format_conf_errors(Errors) ->
+    Opts = #{print_fun => fun io_lib:format/2},
+    maps:values(maps:map(conf_error_formatter(Opts), Errors)).
+
+format_db_errors(Errors) ->
+    Opts = #{print_fun => fun io_lib:format/2},
+    maps:values(
+        maps:map(
+            fun(Tab, Err) -> maybe_print_mnesia_import_err(Tab, Err, Opts) end,
+            Errors
+        )
+    ).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
+copy_and_import(FileNode, BackupFileName) ->
+    case emqx_mgmt_data_backup_proto_v1:read_file(FileNode, BackupFileName, infinity) of
+        {ok, BackupFileContent} ->
+            case upload(BackupFileName, BackupFileContent) of
+                ok ->
+                    import(BackupFileName, #{});
+                Err ->
+                    Err
+            end;
+        Err ->
+            Err
+    end.
+
+%% compatibility with import API that uses lookup_file/1 and returns `not_found` reason
+maybe_not_found({error, enoent}) ->
+    {error, not_found};
+maybe_not_found(Other) ->
+    Other.
+
+do_upload(BackupFileNameStr, BackupFileContent) ->
+    FilePath = ?backup_path(BackupFileNameStr),
+    BackupDir = ?backup_path(filename:basename(BackupFileNameStr, ?TAR_SUFFIX)),
+    try
+        ok = validate_backup_name(BackupFileNameStr),
+        ok = file:write_file(FilePath, BackupFileContent),
+        ok = extract_backup(FilePath),
+        {ok, _} = validate_backup(BackupDir),
+        HoconFileName = filename:join(BackupDir, ?CLUSTER_HOCON_FILENAME),
+        case filelib:is_regular(HoconFileName) of
+            true ->
+                {ok, RawConf} = hocon:files([HoconFileName]),
+                RawConf1 = upgrade_raw_conf(emqx_conf:schema_module(), RawConf),
+                {ok, _} = validate_cluster_hocon(RawConf1),
+                ok;
+            false ->
+                %% cluster.hocon can be missing in the backup
+                ok
+        end,
+        ?SLOG(info, #{msg => "emqx_data_upload_success"})
+    catch
+        error:{badmatch, {error, Reason}}:Stack ->
+            ?SLOG(error, #{msg => "emqx_data_upload_failed", reason => Reason, stacktrace => Stack}),
+            {error, Reason};
+        Class:Reason:Stack ->
+            ?SLOG(error, #{
+                msg => "emqx_data_upload_failed",
+                exception => Class,
+                reason => Reason,
+                stacktrace => Stack
+            }),
+            {error, Reason}
+    after
+        file:del_dir_r(BackupDir)
+    end.
+
 prepare_new_backup(Opts) ->
 prepare_new_backup(Opts) ->
     Ts = erlang:system_time(millisecond),
     Ts = erlang:system_time(millisecond),
     {{Y, M, D}, {HH, MM, SS}} = local_datetime(Ts),
     {{Y, M, D}, {HH, MM, SS}} = local_datetime(Ts),
@@ -186,7 +337,7 @@ prepare_new_backup(Opts) ->
             [Y, M, D, HH, MM, SS, Ts rem 1000]
             [Y, M, D, HH, MM, SS, Ts rem 1000]
         )
         )
     ),
     ),
-    BackupName = filename:join(root_backup_dir(), BackupBaseName),
+    BackupName = ?backup_path(BackupBaseName),
     BackupTarName = ?tar(BackupName),
     BackupTarName = ?tar(BackupName),
     maybe_print("Exporting data to ~p...~n", [BackupTarName], Opts),
     maybe_print("Exporting data to ~p...~n", [BackupTarName], Opts),
     {ok, TarDescriptor} = ?fmt_tar_err(erl_tar:open(BackupTarName, [write, compressed])),
     {ok, TarDescriptor} = ?fmt_tar_err(erl_tar:open(BackupTarName, [write, compressed])),
@@ -208,13 +359,13 @@ do_export(BackupName, TarDescriptor, Opts) ->
     ok = ?fmt_tar_err(erl_tar:close(TarDescriptor)),
     ok = ?fmt_tar_err(erl_tar:close(TarDescriptor)),
     {ok, #file_info{
     {ok, #file_info{
         size = Size,
         size = Size,
-        ctime = {{Y1, M1, D1}, {H1, MM1, S1}}
-    }} = file:read_file_info(BackupTarName),
-    CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y1, M1, D1, H1, MM1, S1]),
+        ctime = CTime
+    }} = file:read_file_info(BackupTarName, [{time, posix}]),
     {ok, #{
     {ok, #{
         filename => bin(BackupTarName),
         filename => bin(BackupTarName),
         size => Size,
         size => Size,
-        created_at => bin(CreatedAt),
+        created_at => emqx_utils_calendar:epoch_to_rfc3339(CTime, second),
+        created_at_sec => CTime,
         node => node()
         node => node()
     }}.
     }}.
 
 
@@ -351,7 +502,7 @@ parse_version_no_patch(VersionBin) ->
     end.
     end.
 
 
 do_import(BackupFileName, Opts) ->
 do_import(BackupFileName, Opts) ->
-    BackupDir = filename:join(root_backup_dir(), filename:basename(BackupFileName, ?TAR_SUFFIX)),
+    BackupDir = ?backup_path(filename:basename(BackupFileName, ?TAR_SUFFIX)),
     maybe_print("Importing data from ~p...~n", [BackupFileName], Opts),
     maybe_print("Importing data from ~p...~n", [BackupFileName], Opts),
     try
     try
         ok = validate_backup_name(BackupFileName),
         ok = validate_backup_name(BackupFileName),
@@ -619,7 +770,7 @@ validate_cluster_hocon(RawConf) ->
 
 
 do_import_conf(RawConf, Opts) ->
 do_import_conf(RawConf, Opts) ->
     GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))),
     GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))),
-    maybe_print_errors(GenConfErrs, Opts),
+    maybe_print_conf_errors(GenConfErrs, Opts),
     Errors =
     Errors =
         lists:foldl(
         lists:foldl(
             fun(Module, ErrorsAcc) ->
             fun(Module, ErrorsAcc) ->
@@ -634,7 +785,7 @@ do_import_conf(RawConf, Opts) ->
             GenConfErrs,
             GenConfErrs,
             sort_importer_modules(find_behaviours(emqx_config_backup))
             sort_importer_modules(find_behaviours(emqx_config_backup))
         ),
         ),
-    maybe_print_errors(Errors, Opts),
+    maybe_print_conf_errors(Errors, Opts),
     Errors.
     Errors.
 
 
 sort_importer_modules(Modules) ->
 sort_importer_modules(Modules) ->
@@ -677,17 +828,17 @@ maybe_print_changed(Changed, Opts) ->
         Changed
         Changed
     ).
     ).
 
 
-maybe_print_errors(Errors, Opts) ->
-    maps:foreach(
-        fun(Path, Err) ->
-            maybe_print(
-                "Failed to import the following config path: ~p, reason: ~p~n",
-                [pretty_path(Path), Err],
-                Opts
-            )
-        end,
-        Errors
-    ).
+maybe_print_conf_errors(Errors, Opts) ->
+    maps:foreach(conf_error_formatter(Opts), Errors).
+
+conf_error_formatter(Opts) ->
+    fun(Path, Err) ->
+        maybe_print(
+            "Failed to import the following config path: ~p, reason: ~p~n",
+            [pretty_path(Path), Err],
+            Opts
+        )
+    end.
 
 
 filter_errors(Results) ->
 filter_errors(Results) ->
     maps:filter(
     maps:filter(
@@ -727,7 +878,7 @@ lookup_file(FileName) ->
             %% Only lookup by basename, don't allow to lookup by file path
             %% Only lookup by basename, don't allow to lookup by file path
             case FileName =:= filename:basename(FileName) of
             case FileName =:= filename:basename(FileName) of
                 true ->
                 true ->
-                    FilePath = filename:join(root_backup_dir(), FileName),
+                    FilePath = ?backup_path(FileName),
                     case filelib:is_file(FilePath) of
                     case filelib:is_file(FilePath) of
                         true -> {ok, FilePath};
                         true -> {ok, FilePath};
                         false -> {error, not_found}
                         false -> {error, not_found}

+ 51 - 0
apps/emqx_management/src/proto/emqx_mgmt_data_backup_proto_v1.erl

@@ -0,0 +1,51 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_data_backup_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    import_file/4,
+    list_files/2,
+    read_file/3,
+    delete_file/3
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.4.0".
+
+-spec list_files([node()], timeout()) ->
+    emqx_rpc:erpc_multicall({non_neg_integer(), map()}).
+list_files(Nodes, Timeout) ->
+    erpc:multicall(Nodes, emqx_mgmt_data_backup, list_files, [], Timeout).
+
+-spec import_file(node(), node(), binary(), timeout()) ->
+    emqx_mgmt_data_backup:import_res() | {badrpc, _}.
+import_file(Node, FileNode, FileName, Timeout) ->
+    rpc:call(Node, emqx_mgmt_data_backup, maybe_copy_and_import, [FileNode, FileName], Timeout).
+
+-spec read_file(node(), binary(), timeout()) ->
+    {ok, binary()} | {error, _} | {bardrpc, _}.
+read_file(Node, FileName, Timeout) ->
+    rpc:call(Node, emqx_mgmt_data_backup, read_file, [FileName], Timeout).
+
+-spec delete_file(node(), binary(), timeout()) -> ok | {error, _} | {bardrpc, _}.
+delete_file(Node, FileName, Timeout) ->
+    rpc:call(Node, emqx_mgmt_data_backup, delete_file, [FileName], Timeout).

+ 355 - 0
apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl

@@ -0,0 +1,355 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_api_data_backup_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(NODE1_PORT, 18085).
+-define(NODE2_PORT, 18086).
+-define(NODE3_PORT, 18087).
+-define(api_base_url(_Port_), ("http://127.0.0.1:" ++ (integer_to_list(_Port_)))).
+
+-define(UPLOAD_EE_BACKUP, "emqx-export-upload-ee.tar.gz").
+-define(UPLOAD_CE_BACKUP, "emqx-export-upload-ce.tar.gz").
+-define(BAD_UPLOAD_BACKUP, "emqx-export-bad-upload.tar.gz").
+-define(BAD_IMPORT_BACKUP, "emqx-export-bad-file.tar.gz").
+-define(backup_path(_Config_, _BackupName_),
+    filename:join(?config(data_dir, _Config_), _BackupName_)
+).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_) ->
+    ok.
+
+init_per_testcase(TC, Config) when
+    TC =:= t_upload_ee_backup;
+    TC =:= t_import_ee_backup
+->
+    case emqx_release:edition() of
+        ee -> do_init_per_testcase(TC, Config);
+        ce -> Config
+    end;
+init_per_testcase(TC, Config) ->
+    do_init_per_testcase(TC, Config).
+
+end_per_testcase(_TC, Config) ->
+    case ?config(cluster, Config) of
+        undefined -> ok;
+        Cluster -> emqx_cth_cluster:stop(Cluster)
+    end.
+
+t_export_backup(Config) ->
+    Auth = ?config(auth, Config),
+    export_test(?NODE1_PORT, Auth),
+    export_test(?NODE2_PORT, Auth),
+    export_test(?NODE3_PORT, Auth).
+
+t_delete_backup(Config) ->
+    test_file_op(delete, Config).
+
+t_get_backup(Config) ->
+    test_file_op(get, Config).
+
+t_list_backups(Config) ->
+    Auth = ?config(auth, Config),
+
+    [{ok, _} = export_backup(?NODE1_PORT, Auth) || _ <- lists:seq(1, 10)],
+    [{ok, _} = export_backup(?NODE2_PORT, Auth) || _ <- lists:seq(1, 10)],
+
+    {ok, RespBody} = list_backups(?NODE1_PORT, Auth, <<"1">>, <<"100">>),
+    #{<<"data">> := Data, <<"meta">> := _} = emqx_utils_json:decode(RespBody),
+    ?assertEqual(20, length(Data)),
+
+    {ok, EmptyRespBody} = list_backups(?NODE2_PORT, Auth, <<"2">>, <<"100">>),
+    #{<<"data">> := EmptyData, <<"meta">> := _} = emqx_utils_json:decode(EmptyRespBody),
+    ?assertEqual(0, length(EmptyData)),
+
+    {ok, RespBodyP1} = list_backups(?NODE3_PORT, Auth, <<"1">>, <<"10">>),
+    {ok, RespBodyP2} = list_backups(?NODE3_PORT, Auth, <<"2">>, <<"10">>),
+    {ok, RespBodyP3} = list_backups(?NODE3_PORT, Auth, <<"3">>, <<"10">>),
+
+    #{<<"data">> := DataP1, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP1),
+    ?assertEqual(10, length(DataP1)),
+    #{<<"data">> := DataP2, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP2),
+    ?assertEqual(10, length(DataP2)),
+    #{<<"data">> := DataP3, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP3),
+    ?assertEqual(0, length(DataP3)),
+
+    ?assertEqual(Data, DataP1 ++ DataP2).
+
+t_upload_ce_backup(Config) ->
+    upload_backup_test(Config, ?UPLOAD_CE_BACKUP).
+
+t_upload_ee_backup(Config) ->
+    case emqx_release:edition() of
+        ee -> upload_backup_test(Config, ?UPLOAD_EE_BACKUP);
+        ce -> ok
+    end.
+
+t_import_ce_backup(Config) ->
+    import_backup_test(Config, ?UPLOAD_CE_BACKUP).
+
+t_import_ee_backup(Config) ->
+    case emqx_release:edition() of
+        ee -> import_backup_test(Config, ?UPLOAD_EE_BACKUP);
+        ce -> ok
+    end.
+
+do_init_per_testcase(TC, Config) ->
+    Cluster = [Core1, _Core2, Repl] = cluster(TC, Config),
+    Auth = auth_header(Core1),
+    ok = wait_for_auth_replication(Repl),
+    [{auth, Auth}, {cluster, Cluster} | Config].
+
+test_file_op(Method, Config) ->
+    Auth = ?config(auth, Config),
+
+    {ok, Node1Resp} = export_backup(?NODE1_PORT, Auth),
+    {ok, Node2Resp} = export_backup(?NODE2_PORT, Auth),
+    {ok, Node3Resp} = export_backup(?NODE3_PORT, Auth),
+
+    ParsedResps = [emqx_utils_json:decode(R) || R <- [Node1Resp, Node2Resp, Node3Resp]],
+
+    [Node1Parsed, Node2Parsed, Node3Parsed] = ParsedResps,
+
+    %% node param is not set in Query, expect get/delete the backup on the local node
+    F1 = fun() ->
+        backup_file_op(Method, ?NODE1_PORT, Auth, maps:get(<<"filename">>, Node1Parsed), [])
+    end,
+    ?assertMatch({ok, _}, F1()),
+    assert_second_call(Method, F1()),
+
+    %% Node 2 must get/delete the backup on Node 3 via rpc
+    F2 = fun() ->
+        backup_file_op(
+            Method,
+            ?NODE2_PORT,
+            Auth,
+            maps:get(<<"filename">>, Node3Parsed),
+            [{<<"node">>, maps:get(<<"node">>, Node3Parsed)}]
+        )
+    end,
+    ?assertMatch({ok, _}, F2()),
+    assert_second_call(Method, F2()),
+
+    %% The same as above but nodes are switched
+    F3 = fun() ->
+        backup_file_op(
+            Method,
+            ?NODE3_PORT,
+            Auth,
+            maps:get(<<"filename">>, Node2Parsed),
+            [{<<"node">>, maps:get(<<"node">>, Node2Parsed)}]
+        )
+    end,
+    ?assertMatch({ok, _}, F3()),
+    assert_second_call(Method, F3()).
+
+export_test(NodeApiPort, Auth) ->
+    {ok, RespBody} = export_backup(NodeApiPort, Auth),
+    #{
+        <<"created_at">> := _,
+        <<"created_at_sec">> := CreatedSec,
+        <<"filename">> := _,
+        <<"node">> := _,
+        <<"size">> := Size
+    } = emqx_utils_json:decode(RespBody),
+    ?assert(is_integer(Size)),
+    ?assert(is_integer(CreatedSec) andalso CreatedSec > 0).
+
+upload_backup_test(Config, BackupName) ->
+    Auth = ?config(auth, Config),
+    UploadFile = ?backup_path(Config, BackupName),
+    BadImportFile = ?backup_path(Config, ?BAD_IMPORT_BACKUP),
+    BadUploadFile = ?backup_path(Config, ?BAD_UPLOAD_BACKUP),
+
+    ?assertEqual(ok, upload_backup(?NODE3_PORT, Auth, UploadFile)),
+    %% This file was specially forged to pass upload validation bat fail on import
+    ?assertEqual(ok, upload_backup(?NODE2_PORT, Auth, BadImportFile)),
+    ?assertEqual({error, bad_request}, upload_backup(?NODE1_PORT, Auth, BadUploadFile)).
+
+import_backup_test(Config, BackupName) ->
+    Auth = ?config(auth, Config),
+    UploadFile = ?backup_path(Config, BackupName),
+    BadImportFile = ?backup_path(Config, ?BAD_IMPORT_BACKUP),
+
+    ?assertEqual(ok, upload_backup(?NODE3_PORT, Auth, UploadFile)),
+
+    %% This file was specially forged to pass upload validation bat fail on import
+    ?assertEqual(ok, upload_backup(?NODE2_PORT, Auth, BadImportFile)),
+
+    %% Replicant node must be able to import the file by doing rpc to a core node
+    ?assertMatch({ok, _}, import_backup(?NODE3_PORT, Auth, BackupName)),
+
+    [N1, N2, N3] = ?config(cluster, Config),
+
+    ?assertMatch({ok, _}, import_backup(?NODE3_PORT, Auth, BackupName)),
+
+    ?assertMatch({ok, _}, import_backup(?NODE1_PORT, Auth, BackupName, N3)),
+    %% Now this node must also have the file locally
+    ?assertMatch({ok, _}, import_backup(?NODE1_PORT, Auth, BackupName, N1)),
+
+    ?assertMatch({error, {_, 400, _}}, import_backup(?NODE2_PORT, Auth, ?BAD_IMPORT_BACKUP, N2)).
+
+assert_second_call(get, Res) ->
+    ?assertMatch({ok, _}, Res);
+assert_second_call(delete, Res) ->
+    ?assertMatch({error, {_, 404, _}}, Res).
+
+export_backup(NodeApiPort, Auth) ->
+    Path = ["data", "export"],
+    request(post, NodeApiPort, Path, Auth).
+
+import_backup(NodeApiPort, Auth, BackupName) ->
+    import_backup(NodeApiPort, Auth, BackupName, undefined).
+
+import_backup(NodeApiPort, Auth, BackupName, Node) ->
+    Path = ["data", "import"],
+    Body = #{<<"filename">> => unicode:characters_to_binary(BackupName)},
+    Body1 =
+        case Node of
+            undefined -> Body;
+            _ -> Body#{<<"node">> => Node}
+        end,
+    request(post, NodeApiPort, Path, Body1, Auth).
+
+list_backups(NodeApiPort, Auth, Page, Limit) ->
+    Path = ["data", "files"],
+    request(get, NodeApiPort, Path, [{<<"page">>, Page}, {<<"limit">>, Limit}], [], Auth).
+
+backup_file_op(Method, NodeApiPort, Auth, BackupName, QueryList) ->
+    Path = ["data", "files", BackupName],
+    request(Method, NodeApiPort, Path, QueryList, [], Auth).
+
+upload_backup(NodeApiPort, Auth, BackupFilePath) ->
+    Path = emqx_mgmt_api_test_util:api_path(?api_base_url(NodeApiPort), ["data", "files"]),
+    Res = emqx_mgmt_api_test_util:upload_request(
+        Path,
+        BackupFilePath,
+        "filename",
+        <<"application/octet-stream">>,
+        [],
+        Auth
+    ),
+    case Res of
+        {ok, {{"HTTP/1.1", 204, _}, _Headers, _}} ->
+            ok;
+        {ok, {{"HTTP/1.1", 400, _}, _Headers, _} = Resp} ->
+            ct:pal("Backup upload failed: ~p", [Resp]),
+            {error, bad_request};
+        Err ->
+            Err
+    end.
+
+request(Method, NodePort, PathParts, Auth) ->
+    request(Method, NodePort, PathParts, [], [], Auth).
+
+request(Method, NodePort, PathParts, Body, Auth) ->
+    request(Method, NodePort, PathParts, [], Body, Auth).
+
+request(Method, NodePort, PathParts, QueryList, Body, Auth) ->
+    Path = emqx_mgmt_api_test_util:api_path(?api_base_url(NodePort), PathParts),
+    Query = unicode:characters_to_list(uri_string:compose_query(QueryList)),
+    emqx_mgmt_api_test_util:request_api(Method, Path, Query, Auth, Body).
+
+cluster(TC, Config) ->
+    Nodes = emqx_cth_cluster:start(
+        [
+            {api_data_backup_core1, #{role => core, apps => apps_spec(18085, TC)}},
+            {api_data_backup_core2, #{role => core, apps => apps_spec(18086, TC)}},
+            {api_data_backup_replicant, #{role => replicant, apps => apps_spec(18087, TC)}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(TC, Config)}
+    ),
+    Nodes.
+
+auth_header(Node) ->
+    {ok, API} = erpc:call(Node, emqx_common_test_http, create_default_app, []),
+    emqx_common_test_http:auth_header(API).
+
+wait_for_auth_replication(ReplNode) ->
+    wait_for_auth_replication(ReplNode, 100).
+
+wait_for_auth_replication(ReplNode, 0) ->
+    {error, {ReplNode, auth_not_ready}};
+wait_for_auth_replication(ReplNode, Retries) ->
+    try
+        {_Header, _Val} = erpc:call(ReplNode, emqx_common_test_http, default_auth_header, []),
+        ok
+    catch
+        _:_ ->
+            timer:sleep(1),
+            wait_for_auth_replication(ReplNode, Retries - 1)
+    end.
+
+apps_spec(APIPort, TC) ->
+    common_apps_spec() ++
+        app_spec_dashboard(APIPort) ++
+        upload_import_apps_spec(TC).
+
+common_apps_spec() ->
+    [
+        emqx,
+        emqx_conf,
+        emqx_management
+    ].
+
+app_spec_dashboard(APIPort) ->
+    [
+        {emqx_dashboard, #{
+            config =>
+                #{
+                    dashboard =>
+                        #{
+                            listeners =>
+                                #{
+                                    http =>
+                                        #{bind => APIPort}
+                                },
+                            default_username => "",
+                            default_password => ""
+                        }
+                }
+        }}
+    ].
+
+upload_import_apps_spec(TC) when
+    TC =:= t_upload_ee_backup;
+    TC =:= t_import_ee_backup;
+    TC =:= t_upload_ce_backup;
+    TC =:= t_import_ce_backup
+->
+    [
+        emqx_auth,
+        emqx_auth_http,
+        emqx_auth_jwt,
+        emqx_auth_mnesia,
+        emqx_rule_engine,
+        emqx_modules,
+        emqx_bridge
+    ];
+upload_import_apps_spec(_TC) ->
+    [].

TEMPAT SAMPAH
apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-bad-file.tar.gz


TEMPAT SAMPAH
apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-bad-upload.tar.gz


TEMPAT SAMPAH
apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-upload-ce.tar.gz


TEMPAT SAMPAH
apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-upload-ee.tar.gz


+ 1 - 0
changes/ce/feat-12017.en.md

@@ -0,0 +1 @@
+Implemented HTTP API for configuration and user data import/export.