Przeglądaj źródła

test: add tooling to make testruns more isolated and predictable

Also showcase their usage in some of the existing testsuites.
Andrew Mayorov 2 lat temu
rodzic
commit
f76f3b77d7

+ 1 - 0
apps/emqx/test/emqx_common_test_helpers.erl

@@ -67,6 +67,7 @@
 -export([
     emqx_cluster/1,
     emqx_cluster/2,
+    start_ekka/0,
     start_epmd/0,
     start_slave/2,
     stop_slave/1,

+ 1 - 2
apps/emqx/test/emqx_common_test_http.erl

@@ -87,8 +87,7 @@ create_default_app() ->
     ExpiredAt = Now + timer:minutes(10),
     emqx_mgmt_auth:create(
         ?DEFAULT_APP_ID, ?DEFAULT_APP_SECRET, true, ExpiredAt, <<"default app key for test">>
-    ),
-    ok.
+    ).
 
 delete_default_app() ->
     emqx_mgmt_auth:delete(?DEFAULT_APP_ID).

+ 411 - 0
apps/emqx/test/emqx_cth_cluster.erl

@@ -0,0 +1,411 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_cth_cluster).
+
+-export([start/2]).
+-export([stop/1]).
+
+-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
+
+-define(TIMEOUT_NODE_START_MS, 15000).
+-define(TIMEOUT_APPS_START_MS, 30000).
+-define(TIMEOUT_NODE_STOP_S, 15).
+
+%%
+
+-type nodespec() :: {_ShortName :: atom(), #{
+    % DB Role
+    % Default: `core`
+    role => core | replicant,
+
+    % DB Backend
+    % Default: `mnesia` if there are no replicants in cluster, otherwise `rlog`
+    %
+    % NOTE
+    % Default are chosen with the intention of lowering the chance of observing
+    % inconsistencies due to data races (i.e. missing mria shards on nodes where some
+    % application hasn't been started yet).
+    db_backend => mnesia | rlog,
+
+    % Applications to start on the node
+    % Default: only applications needed for clustering are started
+    %
+    % NOTES
+    % 1. Apps needed for clustering started unconditionally.
+    %  * It's not possible to redefine their startup order.
+    %  * It's possible to add `{ekka, #{start => false}}` appspec though.
+    % 2. There are defaults applied to some appspecs if they present.
+    %  * We try to keep `emqx_conf` config consistent with default configuration of
+    %    clustering applications.
+    apps => [emqx_cth_suite:appspec()],
+
+    base_port => inet:port_number(),
+
+    % Node to join to in clustering phase
+    % If set to `undefined` this node won't try to join the cluster
+    % Default: no (first core node is used to join to by default)
+    join_to => node() | undefined,
+
+    %% Working directory
+    %% If this directory is not empty, starting up the node applications will fail
+    %% Default: "${ClusterOpts.work_dir}/${nodename}"
+    work_dir => file:name(),
+
+    % Tooling to manage nodes
+    % Default: `ct_slave`.
+    driver => ct_slave | slave
+}}.
+
+-spec start([nodespec()], ClusterOpts) ->
+    [node()]
+when
+    ClusterOpts :: #{
+        %% Working directory
+        %% Everything a test produces should go here. Each node's stuff should go in its
+        %% own directory.
+        work_dir := file:name()
+    }.
+start(Nodes, ClusterOpts) ->
+    NodeSpecs = mk_nodespecs(Nodes, ClusterOpts),
+    % ct:pal("starting cluster: ~p", [NodeSpecs]),
+    io:format(user, "starting cluster: ~p~n", [NodeSpecs]),
+    % 1. Start bare nodes with only basic applications running
+    _ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS),
+    % 2. Start applications needed to enable clustering
+    % Generally, this causes some applications to restart, but we deliberately don't
+    % start them yet.
+    _ = lists:foreach(fun run_node_phase_cluster/1, NodeSpecs),
+    % 3. Start applications after cluster is formed
+    % Cluster-joins are complete, so they shouldn't restart in the background anymore.
+    _ = emqx_utils:pmap(fun run_node_phase_apps/1, NodeSpecs, ?TIMEOUT_APPS_START_MS),
+    % _ = lists:foreach(fun run_node_phase_apps/1, NodeSpecs),
+    [Node || #{name := Node} <- NodeSpecs].
+
+mk_nodespecs(Nodes, ClusterOpts) ->
+    NodeSpecs = lists:zipwith(
+        fun(N, {Name, Opts}) -> mk_init_nodespec(N, Name, Opts, ClusterOpts) end,
+        lists:seq(1, length(Nodes)),
+        Nodes
+    ),
+    CoreNodes = [Node || #{name := Node, role := core} <- NodeSpecs],
+    Backend =
+        case length(CoreNodes) of
+            L when L == length(NodeSpecs) ->
+                mnesia;
+            _ ->
+                rlog
+        end,
+    lists:map(
+        fun(Spec0) ->
+            Spec1 = maps:merge(#{core_nodes => CoreNodes, db_backend => Backend}, Spec0),
+            Spec2 = merge_default_appspecs(Spec1, NodeSpecs),
+            Spec3 = merge_clustering_appspecs(Spec2, NodeSpecs),
+            Spec3
+        end,
+        NodeSpecs
+    ).
+
+mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) ->
+    Node = node_name(Name),
+    BasePort = base_port(N),
+    WorkDir = maps:get(work_dir, ClusterOpts),
+    Defaults = #{
+        name => Node,
+        role => core,
+        apps => [],
+        base_port => BasePort,
+        work_dir => filename:join([WorkDir, Node]),
+        driver => ct_slave
+    },
+    maps:merge(Defaults, NodeOpts).
+
+merge_default_appspecs(#{apps := Apps} = Spec, NodeSpecs) ->
+    Spec#{apps => [mk_node_appspec(App, Spec, NodeSpecs) || App <- Apps]}.
+
+merge_clustering_appspecs(#{apps := Apps} = Spec, NodeSpecs) ->
+    AppsClustering = lists:map(
+        fun(App) ->
+            case lists:keyfind(App, 1, Apps) of
+                AppSpec = {App, _} ->
+                    AppSpec;
+                false ->
+                    {App, default_appspec(App, Spec, NodeSpecs)}
+            end
+        end,
+        ?APPS_CLUSTERING
+    ),
+    AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)],
+    Spec#{apps => AppsClustering ++ AppsRest}.
+
+mk_node_appspec({App, Opts}, Spec, NodeSpecs) ->
+    {App, emqx_cth_suite:merge_appspec(default_appspec(App, Spec, NodeSpecs), Opts)};
+mk_node_appspec(App, Spec, NodeSpecs) ->
+    {App, default_appspec(App, Spec, NodeSpecs)}.
+
+default_appspec(gen_rpc, #{name := Node}, NodeSpecs) ->
+    NodePorts = lists:foldl(
+        fun(#{name := CNode, base_port := Port}, Acc) ->
+            Acc#{CNode => {tcp, gen_rpc_port(Port)}}
+        end,
+        #{},
+        NodeSpecs
+    ),
+    {tcp, Port} = maps:get(Node, NodePorts),
+    #{
+        override_env => [
+            % NOTE
+            % This is needed to make sure `gen_rpc` peers will find each other.
+            {port_discovery, manual},
+            {tcp_server_port, Port},
+            {client_config_per_node, {internal, NodePorts}}
+        ]
+    };
+default_appspec(mria, #{role := Role, db_backend := Backend}, _NodeSpecs) ->
+    #{
+        override_env => [
+            {node_role, Role},
+            {db_backend, Backend}
+        ]
+    };
+default_appspec(ekka, Spec, _NodeSpecs) ->
+    Overrides =
+        case get_cluster_seeds(Spec) of
+            [_ | _] = Seeds ->
+                % NOTE
+                % Presumably, this is needed for replicants to find core nodes.
+                [{cluster_discovery, {static, [{seeds, Seeds}]}}];
+            [] ->
+                []
+        end,
+    #{
+        override_env => Overrides
+    };
+default_appspec(emqx_conf, Spec, _NodeSpecs) ->
+    % NOTE
+    % This usually sets up a lot of `gen_rpc` / `mria` / `ekka` application envs in
+    % `emqx_config:init_load/2` during configuration mapping, so we need to keep them
+    % in sync with the values we set up here.
+    #{
+        name := Node,
+        role := Role,
+        db_backend := Backend,
+        base_port := BasePort,
+        work_dir := WorkDir
+    } = Spec,
+    Listeners = [
+        #{Type => #{default => #{bind => format("127.0.0.1:~p", [Port])}}}
+     || Type <- [tcp, ssl, ws, wss],
+        Port <- [listener_port(BasePort, Type)]
+    ],
+    Cluster =
+        case get_cluster_seeds(Spec) of
+            [_ | _] = Seeds ->
+                % NOTE
+                % Presumably, this is needed for replicants to find core nodes.
+                #{discovery_strategy => static, static => #{seeds => Seeds}};
+            [] ->
+                #{}
+        end,
+    #{
+        config => #{
+            node => #{
+                name => Node,
+                role => Role,
+                cookie => erlang:get_cookie(),
+                % TODO: will it be synced to the same value eventually?
+                data_dir => unicode:characters_to_binary(WorkDir),
+                db_backend => Backend
+            },
+            cluster => Cluster,
+            rpc => #{
+                % NOTE
+                % This (along with `gen_rpc` env overrides) is needed to make sure `gen_rpc`
+                % peers will find each other.
+                protocol => tcp,
+                tcp_server_port => gen_rpc_port(BasePort),
+                port_discovery => manual
+            },
+            listeners => lists:foldl(fun maps:merge/2, #{}, Listeners)
+        }
+    };
+default_appspec(_App, _, _) ->
+    #{}.
+
+get_cluster_seeds(#{join_to := undefined}) ->
+    [];
+get_cluster_seeds(#{join_to := Node}) ->
+    [Node];
+get_cluster_seeds(#{core_nodes := CoreNodes}) ->
+    CoreNodes.
+
+start_node_init(Spec = #{name := Node}) ->
+    Node = start_bare_node(Node, Spec),
+    pong = net_adm:ping(Node),
+    % ok = set_work_dir(Node, Opts),
+    ok = set_node_opts(Node, Spec),
+    ok = snabbkaffe:forward_trace(Node),
+    ok.
+
+run_node_phase_cluster(Spec = #{name := Node}) ->
+    ok = load_apps(Node, Spec),
+    ok = start_apps_clustering(Node, Spec),
+    ok = maybe_join_cluster(Node, Spec),
+    % ?HERE("cluster view @ ~p: ~p", [Node, erpc:call(Node, mria, info, [])]),
+    ok.
+
+run_node_phase_apps(Spec = #{name := Node}) ->
+    ok = start_apps(Node, Spec),
+    ok.
+
+% set_work_dir(Node, #{work_dir := WorkDir}) ->
+%     erpc:call(Node, file, set_cwd, [WorkDir]).
+
+set_node_opts(Node, Spec) ->
+    erpc:call(Node, persistent_term, put, [{?MODULE, opts}, Spec]).
+
+get_node_opts(Node) ->
+    erpc:call(Node, persistent_term, get, [{?MODULE, opts}]).
+
+load_apps(Node, #{apps := Apps}) ->
+    erpc:call(Node, emqx_cth_suite, load_apps, [Apps]).
+
+start_apps_clustering(Node, #{apps := Apps} = Spec) ->
+    SuiteOpts = maps:with([work_dir], Spec),
+    AppsClustering = [lists:keyfind(App, 1, Apps) || App <- ?APPS_CLUSTERING],
+    Started = erpc:call(Node, emqx_cth_suite, start, [AppsClustering, SuiteOpts]),
+    ct:pal("[start_apps_clustering] started apps on ~p: ~p", [Node, Started]),
+    ok.
+
+start_apps(Node, #{apps := Apps} = Spec) ->
+    SuiteOpts = maps:with([work_dir], Spec),
+    AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)],
+    Started = erpc:call(Node, emqx_cth_suite, start_apps, [AppsRest, SuiteOpts]),
+    ct:pal("[start_apps] started apps on ~p: ~p", [Node, Started]),
+    ok.
+
+maybe_join_cluster(_Node, #{role := replicant}) ->
+    ok;
+maybe_join_cluster(Node, Spec) ->
+    case get_cluster_seeds(Spec) of
+        [JoinTo | _] ->
+            ok = join_cluster(Node, JoinTo);
+        [] ->
+            ok
+    end.
+
+join_cluster(Node, JoinTo) ->
+    case erpc:call(Node, ekka, join, [JoinTo]) of
+        ok ->
+            ok;
+        ignore ->
+            ok;
+        Error ->
+            error({failed_to_join_cluster, #{node => Node, error => Error}})
+    end.
+
+%%
+
+stop(Nodes) ->
+    _ = emqx_utils:pmap(fun stop_node/1, Nodes, ?TIMEOUT_NODE_STOP_S * 1000),
+    ok.
+
+stop_node(Name) ->
+    Node = node_name(Name),
+    try get_node_opts(Node) of
+        Opts ->
+            stop_node(Name, Opts)
+    catch
+        error:{erpc, _} ->
+            ok
+    end.
+
+stop_node(Node, #{driver := ct_slave}) ->
+    case ct_slave:stop(Node, [{stop_timeout, ?TIMEOUT_NODE_STOP_S}]) of
+        {ok, _} ->
+            ok;
+        {error, Reason, _} when Reason == not_connected; Reason == not_started ->
+            ok
+    end;
+stop_node(Node, #{driver := slave}) ->
+    slave:stop(Node).
+
+%% Ports
+
+base_port(Number) ->
+    10000 + Number * 100.
+
+gen_rpc_port(BasePort) ->
+    BasePort - 1.
+
+listener_port(BasePort, tcp) ->
+    BasePort;
+listener_port(BasePort, ssl) ->
+    BasePort + 1;
+listener_port(BasePort, quic) ->
+    BasePort + 2;
+listener_port(BasePort, ws) ->
+    BasePort + 3;
+listener_port(BasePort, wss) ->
+    BasePort + 4.
+
+%%
+
+start_bare_node(Name, #{driver := ct_slave}) ->
+    {ok, Node} = ct_slave:start(
+        node_name(Name),
+        [
+            {kill_if_fail, true},
+            {monitor_master, true},
+            {init_timeout, 20_000},
+            {startup_timeout, 20_000},
+            {erl_flags, erl_flags()},
+            {env, []}
+        ]
+    ),
+    Node;
+start_bare_node(Name, #{driver := ct_slave}) ->
+    {ok, Node} = slave:start_link(host(), Name, ebin_path()),
+    Node.
+
+erl_flags() ->
+    %% One core and redirecting logs to master
+    "+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path().
+
+ebin_path() ->
+    string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
+
+is_lib(Path) ->
+    string:prefix(Path, code:lib_dir()) =:= nomatch andalso
+        string:str(Path, "_build/default/plugins") =:= 0.
+
+node_name(Name) ->
+    case string:tokens(atom_to_list(Name), "@") of
+        [_Name, _Host] ->
+            %% the name already has a @
+            Name;
+        _ ->
+            list_to_atom(atom_to_list(Name) ++ "@" ++ host())
+    end.
+
+host() ->
+    [_, Host] = string:tokens(atom_to_list(node()), "@"),
+    Host.
+
+%%
+
+format(Format, Args) ->
+    unicode:characters_to_binary(io_lib:format(Format, Args)).

+ 339 - 0
apps/emqx/test/emqx_cth_suite.erl

@@ -0,0 +1,339 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_cth_suite).
+
+-include_lib("emqx/include/emqx_authentication.hrl").
+
+-export([start/2]).
+-export([stop/1]).
+
+-export([load_apps/1]).
+-export([start_apps/2]).
+-export([start_app/2]).
+-export([start_app/3]).
+-export([stop_apps/1]).
+
+-export([merge_appspec/2]).
+
+-export_type([appspec/0]).
+-export_type([appspec_opts/0]).
+
+-define(NOW,
+    (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}]))
+).
+-define(HERE(FMT, ARGS),
+    io:format(user, "~s [~p] ~p/~p:~p " FMT "~n", [
+        ?NOW, node(), ?FUNCTION_NAME, ?FUNCTION_ARITY, ?LINE | ARGS
+    ])
+).
+
+%%
+
+-type appname() :: atom().
+-type appspec() :: {appname(), appspec_opts()}.
+
+-type appspec_opts() :: #{
+    %% 1. Enable loading application config
+    %% If not defined or set to `false`, this step will be skipped.
+    %% If application is missing a schema module, this step will fail.
+    %% Merging amounts to appending, unless `false` is used, then merge result is also `false`.
+    config => iodata() | emqx_config:config() | emqx_config:raw_config() | false,
+
+    %% 2. Override the application environment
+    %% If not defined or set to `false`, this step will be skipped.
+    %% Merging amounts to appending, unless `false` is used, then merge result is `[]`.
+    override_env => [{atom(), term()}] | false,
+
+    %% 3. Perform anything right before starting the application
+    %% If not defined or set to `false`, this step will be skipped.
+    %% Merging amounts to redefining.
+    before_start => fun(() -> _) | fun((appname()) -> _) | false,
+
+    %% 4. Starting the application
+    %% If not defined or set to `true`, `application:ensure_all_started/1` is used.
+    %% If custom function is used, it should return list of all applications that were started.
+    %% If set to `false`, application will not be started.
+    %% Merging amounts to redefining.
+    start => fun(() -> {ok, [appname()]}) | fun((appname()) -> {ok, [appname()]}) | boolean(),
+
+    %% 5. Perform anything right after starting the application
+    %% If not defined or set to `false`, this step will be skipped.
+    %% Merging amounts to redefining.
+    after_start => fun(() -> _) | fun((appname()) -> _) | false
+}.
+
+%% @doc Start applications with a clean slate.
+%% Provided appspecs will be merged with defaults defined in `default_appspec/1`.
+-spec start([appname() | appspec()], SuiteOpts) ->
+    StartedApps :: [appname()]
+when
+    SuiteOpts :: #{
+        %% Working directory
+        %% Everything a test produces should go here. If this directory is not empty,
+        %% function will raise an error.
+        work_dir := file:name()
+    }.
+start(Apps, SuiteOpts = #{work_dir := WorkDir}) ->
+    % 1. Prepare appspec instructions
+    AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps],
+    % 2. Load every app so that stuff scanning attributes of loaded modules works
+    ok = lists:foreach(fun load_appspec/1, AppSpecs),
+    % 3. Verify that we're running with a clean state.
+    ok = filelib:ensure_dir(filename:join(WorkDir, foo)),
+    ok = verify_clean_suite_state(SuiteOpts),
+    % 4. Setup isolated mnesia directory
+    ok = emqx_common_test_helpers:load(mnesia),
+    ok = application:set_env(mnesia, dir, filename:join([WorkDir, mnesia])),
+    % 5. Start ekka separately.
+    % For some reason it's designed to be started in non-regular way, so we have to track
+    % applications started in the process manually.
+    EkkaSpecs = [{App, proplists:get_value(App, AppSpecs, #{})} || App <- [gen_rpc, mria, ekka]],
+    EkkaApps = start_apps(EkkaSpecs, SuiteOpts),
+    % 6. Start apps following instructions.
+    RestSpecs = [AppSpec || AppSpec <- AppSpecs, not lists:member(AppSpec, EkkaSpecs)],
+    EkkaApps ++ start_appspecs(RestSpecs).
+
+load_apps(Apps) ->
+    lists:foreach(fun load_appspec/1, [mk_appspec(App, #{}) || App <- Apps]).
+
+load_appspec({App, _Opts}) ->
+    ok = emqx_common_test_helpers:load(App),
+    load_app_deps(App).
+
+load_app_deps(App) ->
+    AlreadyLoaded = [A || {A, _, _} <- application:loaded_applications()],
+    case application:get_key(App, applications) of
+        {ok, Deps} ->
+            Apps = Deps -- AlreadyLoaded,
+            ok = lists:foreach(fun emqx_common_test_helpers:load/1, Apps),
+            ok = lists:foreach(fun load_app_deps/1, Apps);
+        undefined ->
+            ok
+    end.
+
+start_apps(Apps, SuiteOpts) ->
+    start_appspecs([mk_appspec(App, SuiteOpts) || App <- Apps]).
+
+start_app(App, StartOpts) ->
+    start_app(App, StartOpts, #{}).
+
+start_app(App, StartOpts, SuiteOpts) ->
+    start_appspecs([mk_appspec({App, StartOpts}, SuiteOpts)]).
+
+start_appspecs(AppSpecs) ->
+    lists:flatmap(
+        fun({App, Spec}) -> start_appspec(App, Spec) end,
+        AppSpecs
+    ).
+
+mk_appspec({App, Opts}, SuiteOpts) ->
+    Defaults = default_appspec(App, SuiteOpts),
+    {App, merge_appspec(Defaults, init_spec(Opts))};
+mk_appspec(App, SuiteOpts) ->
+    Defaults = default_appspec(App, SuiteOpts),
+    {App, Defaults}.
+
+init_spec(Opts = #{}) ->
+    Opts;
+init_spec(Config) when is_list(Config); is_binary(Config) ->
+    #{config => [Config, "\n"]}.
+
+start_appspec(App, StartOpts) ->
+    ?HERE("~p StartOpts=~0p", [App, StartOpts]),
+    _ = maybe_configure_app(App, StartOpts),
+    ?HERE("<- maybe_configure_app/2", []),
+    _ = maybe_override_env(App, StartOpts),
+    ?HERE("<- maybe_override_env/2", []),
+    _ = maybe_before_start(App, StartOpts),
+    ?HERE("<- maybe_before_start/2", []),
+    case maybe_start(App, StartOpts) of
+        {ok, Started} ->
+            ?HERE("<- maybe_start/1 = ~0p", [Started]),
+            _ = maybe_after_start(App, StartOpts),
+            ?HERE("<- maybe_after_start/2", []),
+            Started;
+        {error, Reason} ->
+            error({failed_to_start_app, App, Reason})
+    end.
+
+maybe_configure_app(_App, #{config := false}) ->
+    ok;
+maybe_configure_app(App, #{config := Config}) ->
+    case app_schema(App) of
+        {ok, SchemaModule} ->
+            configure_app(App, SchemaModule, Config);
+        {error, Reason} ->
+            error({failed_to_configure_app, App, Reason})
+    end;
+maybe_configure_app(_App, #{}) ->
+    ok.
+
+configure_app(_App, SchemaModule, Config) ->
+    ok = load_app_config(SchemaModule, Config),
+    % ?HERE("-- roots = ~p", [
+    %     % App,
+    %     [
+    %         emqx_config:get_root([binary_to_atom(Root)])
+    %      || Root <- hocon_schema:root_names(SchemaModule)
+    %     ]
+    % ]),
+    ok.
+
+maybe_override_env(App, #{override_env := Env = [{_, _} | _]}) ->
+    ok = application:set_env([{App, Env}]);
+maybe_override_env(_App, #{}) ->
+    ok.
+
+% maybe_before_start(_App, #{start := false}) ->
+%     ok;
+maybe_before_start(App, #{before_start := Fun}) when is_function(Fun, 1) ->
+    Fun(App);
+maybe_before_start(_App, #{before_start := Fun}) when is_function(Fun, 0) ->
+    Fun();
+maybe_before_start(_App, #{}) ->
+    ok.
+
+maybe_start(_App, #{start := false}) ->
+    {ok, []};
+maybe_start(_App, #{start := Fun}) when is_function(Fun, 0) ->
+    Fun();
+maybe_start(App, #{start := Fun}) when is_function(Fun, 1) ->
+    Fun(App);
+maybe_start(App, #{}) ->
+    application:ensure_all_started(App).
+
+% maybe_after_start(_App, #{start := false}) ->
+%     ok;
+maybe_after_start(App, #{after_start := Fun}) when is_function(Fun, 1) ->
+    Fun(App);
+maybe_after_start(_App, #{after_start := Fun}) when is_function(Fun, 0) ->
+    Fun();
+maybe_after_start(_App, #{}) ->
+    ok.
+
+-spec merge_appspec(appspec_opts(), appspec_opts()) ->
+    appspec_opts().
+merge_appspec(Opts1, Opts2) ->
+    maps:merge_with(
+        fun
+            (config, C1, C2) -> merge_config(C1, C2);
+            (override_env, E1, E2) -> merge_envs(E1, E2);
+            (_Opt, _Val1, Val2) -> Val2
+        end,
+        init_spec(Opts1),
+        init_spec(Opts2)
+    ).
+
+merge_envs(false, E2) ->
+    E2;
+merge_envs(_E, false) ->
+    [];
+merge_envs(E1, E2) ->
+    E1 ++ E2.
+
+merge_config(false, C2) ->
+    C2;
+merge_config(_C, false) ->
+    false;
+merge_config(C1, C2) ->
+    [render_config(C1), "\n", render_config(C2)].
+
+default_appspec(ekka, _SuiteOpts) ->
+    #{
+        start => fun start_ekka/0
+    };
+default_appspec(emqx, SuiteOpts) ->
+    #{
+        override_env => [{data_dir, maps:get(work_dir, SuiteOpts, "data")}]
+    };
+default_appspec(emqx_conf, SuiteOpts) ->
+    #{
+        config => #{
+            node => #{
+                name => node(),
+                cookie => erlang:get_cookie(),
+                % FIXME
+                data_dir => unicode:characters_to_binary(maps:get(work_dir, SuiteOpts, "data"))
+            }
+        },
+        % NOTE
+        % We mark config loaded before starting `emqx_conf` so that it won't
+        % overwrite evenrything with a default configuration.
+        before_start => fun emqx_app:set_init_config_load_done/0
+    };
+default_appspec(emqx_dashboard, _SuiteOpts) ->
+    #{
+        after_start => fun() ->
+            true = emqx_dashboard_listener:is_ready(infinity)
+        end
+    };
+default_appspec(_, _) ->
+    #{}.
+
+%%
+
+start_ekka() ->
+    ok = emqx_common_test_helpers:start_ekka(),
+    {ok, [mnesia, ekka]}.
+
+%%
+
+-spec stop(_StartedApps :: [appname()]) ->
+    ok.
+stop(Apps) ->
+    ok = stop_apps(Apps),
+    clean_suite_state().
+
+-spec stop_apps(_StartedApps :: [appname()]) ->
+    ok.
+stop_apps(Apps) ->
+    ok = lists:foreach(fun application:stop/1, lists:reverse(Apps)),
+    ok = lists:foreach(fun application:unload/1, Apps).
+
+%%
+
+verify_clean_suite_state(#{work_dir := WorkDir}) ->
+    {ok, []} = file:list_dir(WorkDir),
+    none = persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, none),
+    [] = emqx_config:get_root_names(),
+    ok.
+
+clean_suite_state() ->
+    _ = persistent_term:erase(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY),
+    _ = emqx_config:erase_all(),
+    ok.
+
+%%
+
+app_schema(App) ->
+    Mod = list_to_atom(atom_to_list(App) ++ "_schema"),
+    try is_list(Mod:roots()) of
+        true -> {ok, Mod};
+        false -> {error, schema_no_roots}
+    catch
+        error:undef ->
+            {error, schema_not_found}
+    end.
+
+load_app_config(SchemaModule, Config) ->
+    ?HERE("~p ~ts", [SchemaModule, render_config(Config)]),
+    emqx_config:init_load(SchemaModule, render_config(Config)).
+
+render_config(Config = #{}) ->
+    unicode:characters_to_binary(hocon_pp:do(Config, #{}));
+render_config(Config) ->
+    unicode:characters_to_binary(Config).

+ 2 - 1
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -172,7 +172,8 @@ init_node(Type) ->
             ),
             ok = emqx_dashboard:start_listeners(),
             ready = emqx_dashboard_listener:regenerate_minirest_dispatch(),
-            emqx_common_test_http:create_default_app();
+            {ok, _App} = emqx_common_test_http:create_default_app(),
+            ok;
         regular ->
             ok
     end.

+ 1 - 0
apps/emqx_ft/src/emqx_ft.app.src

@@ -7,6 +7,7 @@
         kernel,
         stdlib,
         gproc,
+        emqx,
         emqx_s3
     ]},
     {env, []},

+ 33 - 46
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -65,31 +65,25 @@ suite() ->
     [{timetrap, {seconds, 90}}].
 
 init_per_suite(Config) ->
-    ok = emqx_common_test_helpers:start_apps([emqx_ft], set_special_configs(Config)),
-    Config.
+    % NOTE
+    % Inhibit local fs GC to simulate it isn't fast enough to collect
+    % complete transfers.
+    Storage = emqx_utils_maps:deep_merge(
+        emqx_ft_test_helpers:local_storage(Config),
+        #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}}
+    ),
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx_ft, #{config => emqx_ft_test_helpers:config(Storage)}}
+        ],
+        #{work_dir => ?config(priv_dir, Config)}
+    ),
+    [{suite_apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
     ok.
 
-set_special_configs(Config) ->
-    fun
-        (emqx_ft) ->
-            % NOTE
-            % Inhibit local fs GC to simulate it isn't fast enough to collect
-            % complete transfers.
-            Storage = emqx_utils_maps:deep_merge(
-                emqx_ft_test_helpers:local_storage(Config),
-                #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => <<"0s">>}}}}
-            ),
-            emqx_ft_test_helpers:load_config(#{
-                <<"enable">> => true,
-                <<"storage">> => Storage
-            });
-        (_) ->
-            ok
-    end.
-
 init_per_testcase(Case, Config) ->
     ClientId = atom_to_binary(Case),
     case ?config(group, Config) of
@@ -105,39 +99,32 @@ end_per_testcase(_Case, Config) ->
     ok.
 
 init_per_group(Group = cluster, Config) ->
+    WorkDir = ?config(priv_dir, Config),
     Cluster = mk_cluster_specs(Config),
-    ct:pal("Starting ~p", [Cluster]),
-    Nodes = [
-        emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()})
-     || {Name, Opts} <- Cluster
-    ],
+    Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
     [{group, Group}, {cluster_nodes, Nodes} | Config];
 init_per_group(Group, Config) ->
     [{group, Group} | Config].
 
 end_per_group(cluster, Config) ->
-    ok = lists:foreach(
-        fun emqx_ft_test_helpers:stop_additional_node/1,
-        ?config(cluster_nodes, Config)
-    );
+    ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
 end_per_group(_Group, _Config) ->
     ok.
 
-mk_cluster_specs(Config) ->
-    Specs = [
-        {core, emqx_ft_SUITE1, #{listener_ports => [{tcp, 2883}]}},
-        {core, emqx_ft_SUITE2, #{listener_ports => [{tcp, 3883}]}}
-    ],
-    CommOpts = [
-        {env, [{emqx, boot_modules, [broker, listeners]}]},
-        {apps, [emqx_ft]},
-        {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
-        {env_handler, set_special_configs(Config)}
-    ],
-    emqx_common_test_helpers:emqx_cluster(
-        Specs,
-        CommOpts
-    ).
+mk_cluster_specs(_Config) ->
+    CommonOpts = #{
+        role => core,
+        join_to => node(),
+        apps => [
+            {emqx_conf, #{start => false}},
+            {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
+            {emqx_ft, "file_transfer { enable = true }"}
+        ]
+    },
+    [
+        {emqx_ft_SUITE1, CommonOpts},
+        {emqx_ft_SUITE2, CommonOpts}
+    ].
 
 %%--------------------------------------------------------------------
 %% Tests

+ 96 - 68
apps/emqx_ft/test/emqx_ft_api_SUITE.erl

@@ -24,8 +24,6 @@
 
 -import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
 
--define(SUITE_APPS, [emqx_conf, emqx_ft]).
-
 all() ->
     [
         {group, single},
@@ -35,62 +33,76 @@ all() ->
 groups() ->
     [
         {single, [], emqx_common_test_helpers:all(?MODULE)},
-        {cluster, [], emqx_common_test_helpers:all(?MODULE)}
+        {cluster, [], emqx_common_test_helpers:all(?MODULE) -- [t_ft_disabled]}
     ].
 
 suite() ->
     [{timetrap, {seconds, 90}}].
 
 init_per_suite(Config) ->
-    ok = emqx_mgmt_api_test_util:init_suite(
-        [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
-    ),
-    {ok, _} = emqx:update_config([rpc, port_discovery], manual),
     Config.
+
 end_per_suite(_Config) ->
-    ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]),
     ok.
 
+init_per_group(Group = single, Config) ->
+    WorkDir = ?config(priv_dir, Config),
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, #{}},
+            {emqx_ft, "file_transfer { enable = true }"},
+            {emqx_management, #{}},
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    {ok, App} = emqx_common_test_http:create_default_app(),
+    [{group, Group}, {group_apps, Apps}, {api, App} | Config];
 init_per_group(Group = cluster, Config) ->
+    WorkDir = ?config(priv_dir, Config),
     Cluster = mk_cluster_specs(Config),
-    ct:pal("Starting ~p", [Cluster]),
-    Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster],
-    InitResult = erpc:multicall(Nodes, fun() -> init_node(Config) end),
-    [] = [{Node, Error} || {Node, {R, Error}} <- lists:zip(Nodes, InitResult), R /= ok],
-    [{group, Group}, {cluster_nodes, Nodes} | Config];
-init_per_group(Group, Config) ->
-    [{group, Group} | Config].
+    Nodes = [Node1 | _] = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
+    {ok, App} = erpc:call(Node1, emqx_common_test_http, create_default_app, []),
+    [{group, Group}, {cluster_nodes, Nodes}, {api, App} | Config].
 
+end_per_group(single, Config) ->
+    {ok, _} = emqx_common_test_http:delete_default_app(),
+    ok = emqx_cth_suite:stop(?config(group_apps, Config));
 end_per_group(cluster, Config) ->
-    ok = lists:foreach(
-        fun emqx_ft_test_helpers:stop_additional_node/1,
-        ?config(cluster_nodes, Config)
-    );
+    ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
 end_per_group(_Group, _Config) ->
     ok.
 
 mk_cluster_specs(_Config) ->
-    Specs = [
-        {core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}},
-        {core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}},
-        {replicant, emqx_ft_api_SUITE3, #{listener_ports => [{tcp, 4883}]}}
+    Apps = [
+        {emqx_conf, #{start => false}},
+        {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
+        {emqx_ft, "file_transfer { enable = true }"},
+        {emqx_management, #{}}
     ],
-    CommOpts = #{
-        env => [
-            {mria, db_backend, rlog},
-            {emqx, boot_modules, [broker, listeners]}
-        ],
-        apps => [],
-        load_apps => ?SUITE_APPS,
-        conf => [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]
-    },
-    emqx_common_test_helpers:emqx_cluster(
-        Specs,
-        CommOpts
-    ).
-
-init_node(Config) ->
-    ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, emqx_ft_test_helpers:env_handler(Config)).
+    DashboardConfig =
+        "dashboard { \n"
+        "    listeners.http { enable = true, bind = 0 } \n"
+        "    default_username = \"\" \n"
+        "    default_password = \"\" \n"
+        "}\n",
+    [
+        {emqx_ft_api_SUITE1, #{
+            role => core,
+            apps => Apps ++
+                [
+                    {emqx_dashboard, DashboardConfig ++ "dashboard.listeners.http.bind = 18083"}
+                ]
+        }},
+        {emqx_ft_api_SUITE2, #{
+            role => core,
+            apps => Apps ++ [{emqx_dashboard, DashboardConfig}]
+        }},
+        {emqx_ft_api_SUITE3, #{
+            role => replicant,
+            apps => Apps ++ [{emqx_dashboard, DashboardConfig}]
+        }}
+    ].
 
 init_per_testcase(Case, Config) ->
     [{tc, Case} | Config].
@@ -111,7 +123,7 @@ t_list_files(Config) ->
     ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
 
     {ok, 200, #{<<"files">> := Files}} =
-        request_json(get, uri(["file_transfer", "files"])),
+        request_json(get, uri(["file_transfer", "files"]), Config),
 
     ?assertMatch(
         [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
@@ -119,7 +131,7 @@ t_list_files(Config) ->
     ),
 
     {ok, 200, #{<<"files">> := FilesTransfer}} =
-        request_json(get, uri(["file_transfer", "files", ClientId, FileId])),
+        request_json(get, uri(["file_transfer", "files", ClientId, FileId]), Config),
 
     ?assertMatch(
         [#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
@@ -128,21 +140,23 @@ t_list_files(Config) ->
 
     ?assertMatch(
         {ok, 404, #{<<"code">> := <<"FILES_NOT_FOUND">>}},
-        request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>]))
+        request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>]), Config)
     ).
 
 t_download_transfer(Config) ->
     ClientId = client_id(Config),
     FileId = <<"f1">>,
 
-    Node = lists:last(test_nodes(Config)),
-    ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
+    Nodes = [Node | _] = test_nodes(Config),
+    NodeUpload = lists:last(Nodes),
+    ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, NodeUpload),
 
     ?assertMatch(
         {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
         request_json(
             get,
-            uri(["file_transfer", "file"]) ++ query(#{fileref => FileId})
+            uri(["file_transfer", "file"]) ++ query(#{fileref => FileId}),
+            Config
         )
     ),
 
@@ -151,7 +165,8 @@ t_download_transfer(Config) ->
         request(
             get,
             uri(["file_transfer", "file"]) ++
-                query(#{fileref => FileId, node => <<"nonode@nohost">>})
+                query(#{fileref => FileId, node => <<"nonode@nohost">>}),
+            Config
         )
     ),
 
@@ -160,7 +175,8 @@ t_download_transfer(Config) ->
         request(
             get,
             uri(["file_transfer", "file"]) ++
-                query(#{fileref => <<"unknown_file">>, node => node()})
+                query(#{fileref => <<"unknown_file">>, node => Node}),
+            Config
         )
     ),
 
@@ -169,7 +185,8 @@ t_download_transfer(Config) ->
         request_json(
             get,
             uri(["file_transfer", "file"]) ++
-                query(#{fileref => <<>>, node => node()})
+                query(#{fileref => <<>>, node => Node}),
+            Config
         )
     ),
 
@@ -178,14 +195,15 @@ t_download_transfer(Config) ->
         request_json(
             get,
             uri(["file_transfer", "file"]) ++
-                query(#{fileref => <<"/etc/passwd">>, node => node()})
+                query(#{fileref => <<"/etc/passwd">>, node => Node}),
+            Config
         )
     ),
 
     {ok, 200, #{<<"files">> := [File]}} =
-        request_json(get, uri(["file_transfer", "files", ClientId, FileId])),
+        request_json(get, uri(["file_transfer", "files", ClientId, FileId]), Config),
 
-    {ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File)),
+    {ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File), Config),
 
     ?assertEqual(
         <<"data">>,
@@ -209,44 +227,47 @@ t_list_files_paging(Config) ->
 
     ?assertMatch(
         {ok, 200, #{<<"files">> := [_, _, _], <<"cursor">> := _}},
-        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3}))
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3}), Config)
     ),
 
     {ok, 200, #{<<"files">> := Files}} =
-        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100})),
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}), Config),
 
     ?assert(length(Files) >= NFiles),
 
     ?assertNotMatch(
         {ok, 200, #{<<"cursor">> := _}},
-        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}))
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}), Config)
     ),
 
     ?assertMatch(
         {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
-        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}))
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}), Config)
     ),
 
     ?assertMatch(
         {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
-        request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>}))
+        request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>}), Config)
     ),
 
     ?assertMatch(
         {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
-        request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>}))
+        request_json(
+            get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>}), Config
+        )
     ),
 
     ?assertMatch(
         {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
         request_json(
             get,
-            uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>})
+            uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>}),
+            Config
         )
     ),
 
     PageThrough = fun PageThrough(Query, Acc) ->
-        case request_json(get, uri(["file_transfer", "files"]) ++ query(Query)) of
+        case request_json(get, uri(["file_transfer", "files"]) ++ query(Query), Config) of
             {ok, 200, #{<<"files">> := FilesPage, <<"cursor">> := Cursor}} ->
                 PageThrough(Query#{following => Cursor}, Acc ++ FilesPage);
             {ok, 200, #{<<"files">> := FilesPage}} ->
@@ -258,17 +279,18 @@ t_list_files_paging(Config) ->
     ?assertEqual(Files, PageThrough(#{limit => 8}, [])),
     ?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
 
-t_ft_disabled(_Config) ->
+t_ft_disabled(Config) ->
     ?assertMatch(
         {ok, 200, _},
-        request_json(get, uri(["file_transfer", "files"]))
+        request_json(get, uri(["file_transfer", "files"]), Config)
     ),
 
     ?assertMatch(
         {ok, 400, _},
         request_json(
             get,
-            uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>})
+            uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}),
+            Config
         )
     ),
 
@@ -276,14 +298,15 @@ t_ft_disabled(_Config) ->
 
     ?assertMatch(
         {ok, 503, _},
-        request_json(get, uri(["file_transfer", "files"]))
+        request_json(get, uri(["file_transfer", "files"]), Config)
     ),
 
     ?assertMatch(
         {ok, 503, _},
         request_json(
             get,
-            uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()})
+            uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()}),
+            Config
         )
     ).
 
@@ -308,11 +331,12 @@ mk_file_id(Prefix, N) ->
 mk_file_name(N) ->
     "file." ++ integer_to_list(N).
 
-request(Method, Url) ->
-    emqx_mgmt_api_test_util:request(Method, Url, []).
+request(Method, Url, Config) ->
+    Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
+    emqx_mgmt_api_test_util:request_api(Method, Url, [], auth_header(Config), [], Opts).
 
-request_json(Method, Url) ->
-    case emqx_mgmt_api_test_util:request(Method, Url, []) of
+request_json(Method, Url, Config) ->
+    case request(Method, Url, Config) of
         {ok, Code, Body} ->
             {ok, Code, json(Body)};
         Otherwise ->
@@ -326,6 +350,10 @@ query(Params) ->
     KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),
     "?" ++ string:join(KVs, "&").
 
+auth_header(Config) ->
+    #{api_key := ApiKey, api_secret := Secret} = ?config(api, Config),
+    emqx_common_test_http:auth_header(binary_to_list(ApiKey), binary_to_list(Secret)).
+
 uri_encode(T) ->
     emqx_http_lib:uri_encode(to_list(T)).
 

+ 3 - 3
apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl

@@ -36,11 +36,11 @@ all() ->
     ].
 
 init_per_suite(Config) ->
-    Apps = application:ensure_all_started(gproc),
+    {ok, Apps} = application:ensure_all_started(gproc),
     [{suite_apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    ok.
+end_per_suite(Config) ->
+    emqx_cth_suite:stop_apps(?config(suite_apps, Config)).
 
 init_per_testcase(TC, Config) ->
     ok = snabbkaffe:start_trace(),

+ 13 - 15
apps/emqx_ft/test/emqx_ft_conf_SUITE.erl

@@ -31,22 +31,20 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     ok.
 
-init_per_testcase(_Case, Config) ->
-    _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
-    ok = emqx_common_test_helpers:start_apps(
-        [emqx_conf, emqx_ft], fun
-            (emqx_ft) ->
-                emqx_ft_test_helpers:load_config(#{});
-            (_) ->
-                ok
-        end
-    ),
-    {ok, _} = emqx:update_config([rpc, port_discovery], manual),
-    Config.
+init_per_testcase(Case, Config) ->
+    WorkDir = filename:join(?config(priv_dir, Config), Case),
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx_conf, #{}},
+            {emqx_ft, #{config => "file_transfer {}"}}
+        ],
+        #{work_dir => WorkDir}
+    ),
+    [{suite_apps, Apps} | Config].
 
-end_per_testcase(_Case, _Config) ->
-    ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
-    ok = emqx_config:erase(file_transfer).
+end_per_testcase(_Case, Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
+    ok.
 
 %%--------------------------------------------------------------------
 %% Tests

+ 17 - 20
apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl

@@ -22,45 +22,42 @@
 -include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl").
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("snabbkaffe/include/test_macros.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    _ = application:load(emqx_ft),
-    ok = emqx_common_test_helpers:start_apps([]),
-    Config.
+    Apps = emqx_cth_suite:start([emqx], #{work_dir => ?config(priv_dir, Config)}),
+    [{suite_apps, Apps} | Config].
 
-end_per_suite(_Config) ->
-    ok = emqx_common_test_helpers:stop_apps([]),
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
     ok.
 
 init_per_testcase(TC, Config) ->
     SegmentsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, segments]),
     ExportsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, exports]),
-    ok = emqx_common_test_helpers:start_app(
+    Started = emqx_cth_suite:start_app(
         emqx_ft,
-        fun(emqx_ft) ->
-            emqx_ft_test_helpers:load_config(#{
-                <<"enable">> => true,
-                <<"storage">> => #{
-                    <<"local">> => #{
-                        <<"enable">> => true,
-                        <<"segments">> => #{<<"root">> => SegmentsRoot},
-                        <<"exporter">> => #{
-                            <<"local">> => #{<<"enable">> => true, <<"root">> => ExportsRoot}
-                        }
+        #{
+            config => emqx_ft_test_helpers:config(#{
+                <<"local">> => #{
+                    <<"enable">> => true,
+                    <<"segments">> => #{<<"root">> => SegmentsRoot},
+                    <<"exporter">> => #{
+                        <<"local">> => #{<<"enable">> => true, <<"root">> => ExportsRoot}
                     }
                 }
             })
-        end
+        }
     ),
     ok = snabbkaffe:start_trace(),
-    Config.
+    [{tc_apps, Started} | Config].
 
-end_per_testcase(_TC, _Config) ->
+end_per_testcase(_TC, Config) ->
     ok = snabbkaffe:stop(),
-    ok = application:stop(emqx_ft),
+    ok = emqx_cth_suite:stop_apps(?config(tc_apps, Config)),
     ok.
 
 %%

+ 3 - 0
apps/emqx_ft/test/emqx_ft_test_helpers.erl

@@ -49,6 +49,9 @@ env_handler(Config) ->
             ok
     end.
 
+config(Storage) ->
+    #{<<"file_transfer">> => #{<<"enable">> => true, <<"storage">> => Storage}}.
+
 local_storage(Config) ->
     local_storage(Config, #{exporter => local}).
 

+ 2 - 1
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -32,7 +32,8 @@ init_suite(Apps, SetConfigs) when is_function(SetConfigs) ->
 init_suite(Apps, SetConfigs, Opts) ->
     application:load(emqx_management),
     emqx_common_test_helpers:start_apps(Apps ++ [emqx_dashboard], SetConfigs, Opts),
-    emqx_common_test_http:create_default_app().
+    _ = emqx_common_test_http:create_default_app(),
+    ok.
 
 end_suite() ->
     end_suite([]).

+ 29 - 82
apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl

@@ -28,13 +28,12 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    [application:load(App) || App <- apps_to_start() ++ apps_to_load()],
     Config.
 
 end_per_suite(_Config) ->
     ok.
 
-init_per_testcase(t_import_on_cluster, Config) ->
+init_per_testcase(TC = t_import_on_cluster, Config0) ->
     %% Don't import listeners to avoid port conflicts
     %% when the same conf will be imported to another cluster
     meck:new(emqx_mgmt_listeners_conf, [passthrough]),
@@ -51,23 +50,25 @@ init_per_testcase(t_import_on_cluster, Config) ->
         1,
         {ok, #{changed => [], root_key => gateway}}
     ),
+    Config = [{tc_name, TC} | Config0],
     [{cluster, cluster(Config)} | setup(Config)];
-init_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) ->
+init_per_testcase(TC = t_verify_imported_mnesia_tab_on_cluster, Config0) ->
+    Config = [{tc_name, TC} | Config0],
     [{cluster, cluster(Config)} | setup(Config)];
 init_per_testcase(t_mnesia_bad_tab_schema, Config) ->
     meck:new(emqx_mgmt_data_backup, [passthrough]),
-    meck:expect(emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [data_backup_test]),
-    setup(Config);
-init_per_testcase(_TestCase, Config) ->
-    setup(Config).
+    meck:expect(TC = emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [data_backup_test]),
+    setup([{tc_name, TC} | Config]);
+init_per_testcase(TC, Config) ->
+    setup([{tc_name, TC} | Config]).
 
 end_per_testcase(t_import_on_cluster, Config) ->
-    cleanup_cluster(?config(cluster, Config)),
+    emqx_cth_cluster:stop(?config(cluster, Config)),
     cleanup(Config),
     meck:unload(emqx_mgmt_listeners_conf),
     meck:unload(emqx_gateway_conf);
 end_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) ->
-    cleanup_cluster(?config(cluster, Config)),
+    emqx_cth_cluster:stop(?config(cluster, Config)),
     cleanup(Config);
 end_per_testcase(t_mnesia_bad_tab_schema, Config) ->
     cleanup(Config),
@@ -356,8 +357,6 @@ t_mnesia_bad_tab_schema(_Config) ->
 
 t_read_files(_Config) ->
     DataDir = emqx:data_dir(),
-    %% Relative "data" path is set in init_per_testcase/2, asserting it must be safe
-    ?assertEqual("data", DataDir),
     {ok, Cwd} = file:get_cwd(),
     AbsDataDir = filename:join(Cwd, DataDir),
     FileBaseName = "t_read_files_tmp_file",
@@ -388,30 +387,12 @@ t_read_files(_Config) ->
 %%------------------------------------------------------------------------------
 
 setup(Config) ->
-    %% avoid port conflicts if the cluster is started
-    AppHandler = fun
-        (emqx_dashboard) ->
-            ok = emqx_config:put([dashboard, listeners, http, bind], 0);
-        (_) ->
-            ok
-    end,
-    ok = emqx_common_test_helpers:start_apps(apps_to_start(), AppHandler),
-    PrevDataDir = application:get_env(emqx, data_dir),
-    application:set_env(emqx, data_dir, "data"),
-    [{previous_emqx_data_dir, PrevDataDir} | Config].
+    WorkDir = filename:join(work_dir(Config), local),
+    Started = emqx_cth_suite:start(apps_to_start(), #{work_dir => WorkDir}),
+    [{suite_apps, Started} | Config].
 
 cleanup(Config) ->
-    emqx_common_test_helpers:stop_apps(apps_to_start()),
-    case ?config(previous_emqx_data_dir, Config) of
-        undefined ->
-            application:unset_env(emqx, data_dir);
-        {ok, Val} ->
-            application:set_env(emqx, data_dir, Val)
-    end.
-
-cleanup_cluster(ClusterNodes) ->
-    [rpc:call(N, ekka, leave, []) || N <- lists:reverse(ClusterNodes)],
-    [emqx_common_test_helpers:stop_slave(N) || N <- ClusterNodes].
+    emqx_cth_suite:stop(?config(suite_apps, Config)).
 
 users(Prefix) ->
     [
@@ -428,50 +409,18 @@ recompose_version(MajorInt, MinorInt, Patch) ->
     ).
 
 cluster(Config) ->
-    PrivDataDir = ?config(priv_dir, Config),
-    [{Core1, Core1Opts}, {Core2, Core2Opts}, {Replicant, ReplOpts}] =
-        emqx_common_test_helpers:emqx_cluster(
-            [
-                {core, data_backup_core1},
-                {core, data_backup_core2},
-                {replicant, data_backup_replicant}
-            ],
-            #{
-                priv_data_dir => PrivDataDir,
-                schema_mod => emqx_conf_schema,
-                apps => apps_to_start(),
-                load_apps => apps_to_start() ++ apps_to_load(),
-                env => [{mria, db_backend, rlog}],
-                load_schema => true,
-                start_autocluster => true,
-                listener_ports => [],
-                conf => [{[dashboard, listeners, http, bind], 0}],
-                env_handler =>
-                    fun(_) ->
-                        application:set_env(emqx, boot_modules, [broker, router])
-                    end
-            }
-        ),
-    Node1 = emqx_common_test_helpers:start_slave(Core1, Core1Opts),
-    Node2 = emqx_common_test_helpers:start_slave(Core2, Core2Opts),
-    #{conf := _ReplConf, env := ReplEnv} = ReplOpts,
-    ClusterDiscovery = {static, [{seeds, [Node1, Node2]}]},
-    ReplOpts1 = maps:remove(
-        join_to,
-        ReplOpts#{
-            env => [{ekka, cluster_discovery, ClusterDiscovery} | ReplEnv],
-            env_handler => fun(_) ->
-                application:set_env(emqx, boot_modules, [broker, router]),
-                application:set_env(
-                    ekka,
-                    cluster_discovery,
-                    ClusterDiscovery
-                )
-            end
-        }
+    Nodes = emqx_cth_cluster:start(
+        [
+            {data_backup_core1, #{role => core, apps => apps_to_start()}},
+            {data_backup_core2, #{role => core, apps => apps_to_start()}},
+            {data_backup_replicant, #{role => replicant, apps => apps_to_start()}}
+        ],
+        #{work_dir => work_dir(Config)}
     ),
-    ReplNode = emqx_common_test_helpers:start_slave(Replicant, ReplOpts1),
-    [Node1, Node2, ReplNode].
+    Nodes.
+
+work_dir(Config) ->
+    filename:join(?config(priv_dir, Config), ?config(tc_name, Config)).
 
 create_test_tab(Attributes) ->
     ok = mria:create_table(data_backup_test, [
@@ -491,8 +440,8 @@ create_test_tab(Attributes) ->
 
 apps_to_start() ->
     [
-        emqx,
-        emqx_conf,
+        {emqx_conf, "dashboard.listeners.http.bind = 0"},
+        {emqx, #{override_env => [{boot_modules, [broker, router]}]}},
         emqx_psk,
         emqx_management,
         emqx_dashboard,
@@ -505,11 +454,9 @@ apps_to_start() ->
         emqx_gateway,
         emqx_exhook,
         emqx_bridge,
-        emqx_auto_subscribe
-    ].
+        emqx_auto_subscribe,
 
-apps_to_load() ->
-    [
+        % loaded only
         emqx_gateway_lwm2m,
         emqx_gateway_coap,
         emqx_gateway_exproto,