Просмотр исходного кода

Merge pull request #2490 from emqx/develop

Merge develop to dev32
turtleDeng 6 лет назад
Родитель
Сommit
79c2879db6

+ 5 - 2
Makefile

@@ -20,8 +20,11 @@ ERLC_OPTS += +debug_info -DAPPLICATION=emqx
 BUILD_DEPS = cuttlefish
 dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.2.1
 
-TEST_DEPS = meck
-dep_meck = hex-emqx 0.8.13
+CUR_BRANCH := $(shell git branch | grep -e "^*" | cut -d' ' -f 2)
+BRANCH := $(if $(filter $(CUR_BRANCH), master develop), $(CUR_BRANCH), develop)
+
+TEST_DEPS = emqx_ct_helpers
+dep_emqx_ct_helpers = git-emqx https://github.com/emqx/emqx-ct-helpers.git v1.0
 
 TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx
 

+ 2 - 2
etc/emqx.conf

@@ -694,7 +694,7 @@ zone.external.flapping_threshold = 10, 1m
 ## -s: second
 ##
 ## Default: 1h, 1 hour
-zone.external.flapping_expiry_interval = 1h
+zone.external.flapping_banned_expiry_interval = 1h
 
 ## All the topics will be prefixed with the mountpoint path if this option is enabled.
 ##
@@ -789,7 +789,7 @@ zone.internal.flapping_threshold = 10, 1m
 ## -s: second
 ##
 ## Default: 1h, 1 hour
-zone.internal.flapping_expiry_interval = 1h
+zone.internal.flapping_banned_expiry_interval = 1h
 
 ## All the topics will be prefixed with the mountpoint path if this option is enabled.
 ##

+ 1 - 1
priv/emqx.schema

@@ -836,7 +836,7 @@ end}.
   {datatype, string}
 ]}.
 
-{mapping, "zone.$name.flapping_expiry_interval", "emqx.zones", [
+{mapping, "zone.$name.flapping_banned_expiry_interval", "emqx.zones", [
   {datatype, {duration, s}}
 ]}.
 

+ 10 - 10
rebar.config

@@ -1,17 +1,17 @@
 {deps, [{jsx, "2.9.0"},
         {gproc, "0.8.0"},
-        {cowboy, "2.6.1"},
-        {meck, "0.8.13"} %% temp workaround for version check
-       ]}.
+        {cowboy, "2.6.1"}]}.
 
 %% appended to deps in rebar.config.script
-{github_emqx_deps,
-    [{gen_rpc, "2.3.1"},
-     {ekka, "v0.5.4"},
-     {replayq, "v0.1.1"},
-     {esockd, "v5.4.4"},
-     {cuttlefish, "v2.2.1"}
-    ]}.
+{github_emqx_libs,
+ [{gen_rpc, "2.3.1"},
+  {ekka, "v0.5.4"},
+  {replayq, "v0.1.1"},
+  {esockd, "v5.4.4"},
+  {cuttlefish, "v2.2.1"}]}.
+
+{github_emqx_projects,
+ [{emqx_ct_helpers, "v1.0"}]}.
 
 {edoc_opts, [{preprocess, true}]}.
 {erl_opts, [warn_unused_vars,

+ 22 - 4
rebar.config.script

@@ -16,10 +16,28 @@ CONFIG1 = case os:getenv("TRAVIS") of
                   CONFIG
           end,
 
-{_, Deps} = lists:keyfind(deps, 1, CONFIG1),
-{_, OurDeps} = lists:keyfind(github_emqx_deps, 1, CONFIG1),
+FindDeps = fun(DepsType, Config) ->
+                   case lists:keyfind(DepsType, 1, Config) of
+                       {_, RawDeps} -> RawDeps;
+                       _ -> []
+                   end
+           end,
+Deps = FindDeps(deps, CONFIG1),
+LibDeps = FindDeps(github_emqx_libs, CONFIG1),
+ProjDeps = FindDeps(github_emqx_projects, CONFIG1),
 UrlPrefix = "https://github.com/emqx/",
-NewDeps = Deps ++ [{Name, {git, UrlPrefix ++ atom_to_list(Name), {branch, Branch}}} || {Name, Branch} <- OurDeps],
-CONFIG2 = lists:keystore(deps, 1, CONFIG1, {deps, NewDeps}),
+RealName = fun TransName([$_ | Tail], Result) ->
+                   TransName(Tail, [$- | Result]);
+               TransName([Head | Tail], Result) ->
+                   TransName(Tail, [Head | Result]);
+               TransName([], Result) ->
+                   lists:reverse(Result)
+           end,
+
+NewLibDeps = [{LibName, {git, UrlPrefix ++ atom_to_list(LibName), {branch, Branch}}}
+              || {LibName, Branch} <- LibDeps],
+NewProjDeps = [{ProjName, {git, UrlPrefix ++ RealName(atom_to_list(ProjName), []), {branch, Branch}}} || {ProjName, Branch} <- ProjDeps],
 
+NewDeps = Deps ++ NewLibDeps ++ NewProjDeps,
+CONFIG2 = lists:keystore(deps, 1, CONFIG1, {deps, NewDeps}),
 CONFIG2.

+ 0 - 1
src/emqx_access_control.erl

@@ -15,7 +15,6 @@
 -module(emqx_access_control).
 
 -include("emqx.hrl").
--include("logger.hrl").
 
 -export([authenticate/1]).
 

+ 0 - 1
src/emqx_cm.erl

@@ -207,4 +207,3 @@ stats_fun() ->
         undefined -> ok;
         Size -> emqx_stats:setstat('connections/count', 'connections/max', Size)
     end.
-

+ 16 - 41
src/emqx_flapping.erl

@@ -15,7 +15,6 @@
 -module(emqx_flapping).
 
 -include("emqx.hrl").
--include("logger.hrl").
 -include("types.hrl").
 
 -behaviour(gen_statem).
@@ -50,27 +49,23 @@
 %% the expiry time unit is minutes.
 -spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()).
 init_flapping(ClientId, Interval) ->
-    #flapping{ client_id = ClientId
-             , check_count = 1
-             , timestamp = emqx_time:now_secs() + Interval
-             }.
+    #flapping{client_id = ClientId,
+              check_count = 1,
+              timestamp = emqx_time:now_secs() + Interval}.
 
 %% @doc This function is used to initialize flapping records
 %% the expiry time unit is minutes.
--spec(check( Action :: atom()
-           , ClientId :: binary()
-           , Threshold :: {integer(), integer()})
-      -> flapping_state()).
+-spec(check(Action :: atom(), ClientId :: binary(),
+            Threshold :: {integer(), integer()}) -> flapping_state()).
 check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) ->
     check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)).
 
--spec(check( Action :: atom()
-           , ClientId :: binary()
-           , Threshold :: {integer(), integer()}
-           , InitFlapping :: flapping_record())
-      -> flapping_state()).
+-spec(check(Action :: atom(), ClientId :: binary(),
+            Threshold :: {integer(), integer()},
+            InitFlapping :: flapping_record()) -> flapping_state()).
 check(Action, ClientId, Threshold, InitFlapping) ->
-    try ets:update_counter(?FLAPPING_TAB, ClientId, {_Pos = #flapping.check_count, 1}) of
+    case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.check_count, 1}, InitFlapping) of
+        1 -> ok;
         CheckCount ->
             case ets:lookup(?FLAPPING_TAB, ClientId) of
                 [Flapping] ->
@@ -78,23 +73,14 @@ check(Action, ClientId, Threshold, InitFlapping) ->
                 _Flapping ->
                     ok
             end
-    catch
-        error:badarg ->
-            ets:insert_new(?FLAPPING_TAB, InitFlapping),
-            ok
     end.
 
--spec(check_flapping( Action :: atom()
-                    , CheckTimes :: integer()
-                    , Threshold :: {integer(), integer()}
-                    , InitFlapping :: flapping_record())
-      -> flapping_state()).
-check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval},
+check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval},
                Flapping = #flapping{ client_id = ClientId
                                    , timestamp = Timestamp }) ->
     case emqx_time:now_secs() of
         NowTimestamp when NowTimestamp =< Timestamp,
-                          CheckTimes > TimesThreshold ->
+                          CheckCount > TimesThreshold ->
             ets:delete(?FLAPPING_TAB, ClientId),
             flapping;
         NowTimestamp when NowTimestamp > Timestamp,
@@ -110,7 +96,7 @@ check_flapping(Action, CheckTimes, _Threshold = {TimesThreshold, TimeInterval},
 %%--------------------------------------------------------------------
 %% gen_statem callbacks
 %%--------------------------------------------------------------------
--spec(start_link(TimerInterval :: integer()) -> startlink_ret()).
+-spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()).
 start_link(TimerInterval) ->
     gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []).
 
@@ -145,17 +131,6 @@ terminate(_Reason, _StateName, _State) ->
 
 %% @doc clean expired records in ets
 clean_expired_records() ->
-    Records = ets:tab2list(?FLAPPING_TAB),
-    traverse_records(Records).
-
-traverse_records([]) ->
-    ok;
-traverse_records([#flapping{client_id = ClientId,
-                            timestamp = Timestamp} | LeftRecords]) ->
-    case emqx_time:now_secs() > Timestamp of
-        true ->
-            ets:delete(?FLAPPING_TAB, ClientId);
-        false ->
-            true
-    end,
-    traverse_records(LeftRecords).
+    NowTime = emqx_time:now_secs(),
+    MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}],
+    ets:select_delete(?FLAPPING_TAB, MatchSpec).

+ 3 - 4
src/emqx_protocol.erl

@@ -943,16 +943,15 @@ flag(true)  -> 1.
 do_flapping_detect(Action, #pstate{zone = Zone,
                                    client_id = ClientId,
                                    enable_flapping_detect = true}) ->
-    ExpiryInterval = emqx_zone:get_env(Zone, flapping_expiry_interval, 3600000),
+    BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000),
     Threshold = emqx_zone:get_env(Zone, flapping_threshold, 20),
-    Until = erlang:system_time(second) + ExpiryInterval,
+    Until = erlang:system_time(second) + BanExpiryInterval,
     case emqx_flapping:check(Action, ClientId, Threshold) of
         flapping ->
             emqx_banned:add(#banned{who = {client_id, ClientId},
                                     reason = <<"flapping">>,
                                     by = <<"flapping_checker">>,
-                                    until = Until
-                                   }),
+                                    until = Until}),
             ok;
         _Other ->
             ok

+ 0 - 70
src/emqx_rate_limiter.erl

@@ -1,70 +0,0 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-
--module(emqx_rate_limiter).
-
--behaviour(gen_server).
-
-%% API
--export([start_link/0]).
-
-%% gen_server callbacks
--export([ init/1
-        , handle_call/3
-        , handle_cast/2
-        , handle_info/2
-        , terminate/2
-        , code_change/3
-        ]).
-
--define(SERVER, ?MODULE).
-
--record(state, {}).
-
-%%------------------------------------------------------------------------------
-%%% API
-%%------------------------------------------------------------------------------
-
-%% @doc Starts the server
--spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
-start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-%%------------------------------------------------------------------------------
-%%% gen_server callbacks
-%%------------------------------------------------------------------------------
-
-init([]) ->
-    {ok, #state{}}.
-
-handle_call(_Request, _From, State) ->
-    Reply = ok,
-    {reply, Reply, State}.
-
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%------------------------------------------------------------------------------
-%%% Internal functions
-%%------------------------------------------------------------------------------
-

+ 15 - 19
test/emqx_SUITE.erl

@@ -70,25 +70,21 @@ all() ->
 
 groups() ->
     [{connect, [non_parallel_tests],
-      [
-      mqtt_connect,
-      mqtt_connect_with_tcp,
-      mqtt_connect_with_will_props,
-      mqtt_connect_with_ssl_oneway,
-      mqtt_connect_with_ssl_twoway,
-      mqtt_connect_with_ws
-      ]},
-    {publish, [non_parallel_tests],
-      [
-      packet_size
-      ]}].
+      [mqtt_connect,
+       mqtt_connect_with_tcp,
+       mqtt_connect_with_will_props,
+       mqtt_connect_with_ssl_oneway,
+       mqtt_connect_with_ssl_twoway,
+       mqtt_connect_with_ws]},
+     {publish, [non_parallel_tests],
+      [packet_size]}].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 %%--------------------------------------------------------------------
 %% Protocol Test
@@ -127,9 +123,9 @@ mqtt_connect_with_will_props(_) ->
 
 mqtt_connect_with_ssl_oneway(_) ->
     emqx:shutdown(),
-    emqx_ct_broker_helpers:change_opts(ssl_oneway),
+    emqx_ct_helpers:change_emqx_opts(ssl_oneway),
     emqx:start(),
-    ClientSsl = emqx_ct_broker_helpers:client_ssl(),
+    ClientSsl = emqx_ct_helpers:client_ssl(),
     {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock}
     = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000),
     Packet = raw_send_serialize(?CLIENT),
@@ -145,11 +141,11 @@ mqtt_connect_with_ssl_oneway(_) ->
 
 mqtt_connect_with_ssl_twoway(_Config) ->
     emqx:shutdown(),
-    emqx_ct_broker_helpers:change_opts(ssl_twoway),
+    emqx_ct_helpers:change_emqx_opts(ssl_twoway),
     emqx:start(),
-    ClientSsl = emqx_ct_broker_helpers:client_ssl_twoway(),
+    ClientSsl = emqx_ct_helpers:client_ssl_twoway(),
     {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock}
-    = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000),
+        = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000),
     Packet = raw_send_serialize(?CLIENT),
     emqx_client_sock:setopts(Sock, [{active, once}]),
     emqx_client_sock:send(Sock, Packet),

+ 19 - 27
test/emqx_access_SUITE.erl

@@ -31,46 +31,41 @@ all() ->
     [{group, access_control},
      {group, acl_cache},
      {group, access_control_cache_mode},
-     {group, access_rule}
-     ].
+     {group, access_rule}].
 
 groups() ->
     [{access_control, [sequence],
-       [reload_acl,
-        register_mod,
-        unregister_mod,
-        check_acl_1,
-        check_acl_2
-        ]},
+      [reload_acl,
+       register_mod,
+       unregister_mod,
+       check_acl_1,
+       check_acl_2]},
      {access_control_cache_mode, [],
-       [
-        acl_cache_basic,
-        acl_cache_expiry,
-        acl_cache_cleanup,
-        acl_cache_full
-        ]},
-     {acl_cache, [], [
-       put_get_del_cache,
+      [acl_cache_basic,
+       acl_cache_expiry,
+       acl_cache_cleanup,
+       acl_cache_full]},
+     {acl_cache, [],
+      [put_get_del_cache,
        cache_update,
        cache_expiry,
        cache_replacement,
        cache_cleanup,
        cache_auto_emtpy,
-       cache_auto_cleanup
-     ]},
+       cache_auto_cleanup]},
      {access_rule, [],
-       [compile_rule,
-        match_rule]}].
+      [compile_rule,
+       match_rule]}].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teadown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
-init_per_group(Group, Config) when  Group =:= access_control;
-                                    Group =:= access_control_cache_mode ->
+init_per_group(Group, Config) when Group =:= access_control;
+                                   Group =:= access_control_cache_mode ->
     prepare_config(Group),
     application:load(emqx),
     Config;
@@ -97,7 +92,6 @@ set_acl_config_file(_Group) ->
     write_config("access_SUITE_acl.conf", Rules),
     application:set_env(emqx, acl_file, "access_SUITE_acl.conf").
 
-
 write_config(Filename, Terms) ->
     file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]).
 
@@ -105,7 +99,6 @@ end_per_group(_Group, Config) ->
     Config.
 
 init_per_testcase(_TestCase, Config) ->
-    %% {ok, _Pid} =
     ?AC:start_link(),
     Config.
 end_per_testcase(_TestCase, _Config) ->
@@ -116,7 +109,6 @@ per_testcase_config(acl_cache_full, Config) ->
 per_testcase_config(_TestCase, Config) ->
     Config.
 
-
 %%--------------------------------------------------------------------
 %% emqx_access_control
 %%--------------------------------------------------------------------

+ 5 - 38
test/emqx_alarm_handler_SUITE.erl

@@ -24,51 +24,18 @@
 -include("emqx_mqtt.hrl").
 -include("emqx.hrl").
 
-all() -> [t_alarm_handler, t_logger_handler].
+all() -> [t_alarm_handler,
+          t_logger_handler].
 
 init_per_suite(Config) ->
-    [start_apps(App, {SchemaFile, ConfigFile}) ||
-        {App, SchemaFile, ConfigFile}
-            <- [{emqx, local_path("priv/emqx.schema"),
-                       local_path("etc/gen.emqx.conf")}]],
+    emqx_ct_helpers:start_apps([], fun set_special_configs/1),
     Config.
 
 end_per_suite(_Config) ->
-    application:stop(emqx).
-
-local_path(RelativePath) ->
-    filename:join([get_base_dir(), RelativePath]).
-
-deps_path(App, RelativePath) ->
-    %% Note: not lib_dir because etc dir is not sym-link-ed to _build dir
-    %% but priv dir is
-    Path0 = code:priv_dir(App),
-    Path = case file:read_link(Path0) of
-               {ok, Resolved} -> Resolved;
-               {error, _} -> Path0
-           end,
-    filename:join([Path, "..", RelativePath]).
-
-get_base_dir() ->
-    {file, Here} = code:is_loaded(?MODULE),
-    filename:dirname(filename:dirname(Here)).
-
-start_apps(App, {SchemaFile, ConfigFile}) ->
-    read_schema_configs(App, {SchemaFile, ConfigFile}),
-    set_special_configs(App),
-    application:ensure_all_started(App).
-
-read_schema_configs(App, {SchemaFile, ConfigFile}) ->
-    ct:pal("Read configs - SchemaFile: ~p, ConfigFile: ~p", [SchemaFile, ConfigFile]),
-    Schema = cuttlefish_schema:files([SchemaFile]),
-    Conf = conf_parse:file(ConfigFile),
-    NewConfig = cuttlefish_generator:map(Schema, Conf),
-    Vals = proplists:get_value(App, NewConfig, []),
-    [application:set_env(App, Par, Value) || {Par, Value} <- Vals].
+    emqx_ct_helpers:stop_apps([]).
 
 set_special_configs(emqx) ->
-    application:set_env(emqx, acl_file, deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
-
+    application:set_env(emqx, acl_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
 set_special_configs(_App) ->
     ok.
 

+ 2 - 3
test/emqx_banned_SUITE.erl

@@ -24,7 +24,7 @@
 all() -> [t_banned_all].
 
 t_banned_all(_) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     emqx_banned:start_link(),
     TimeNow = erlang:system_time(second),
     Banned = #banned{who = {client_id, <<"TestClient">>},
@@ -49,5 +49,4 @@ t_banned_all(_) ->
     ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>,
                                    username => undefined,
                                    peername => {undefined, undefined}})),
-    emqx_ct_broker_helpers:run_teardown_steps().
-
+    emqx_ct_helpers:stop_apps([]).

+ 22 - 24
test/emqx_bridge_SUITE.erl

@@ -14,11 +14,12 @@
 
 -module(emqx_bridge_SUITE).
 
--export([all/0, init_per_suite/1, end_per_suite/1]).
--export([t_rpc/1,
-         t_mqtt/1,
-         t_mngr/1
-        ]).
+-export([ all/0
+        , init_per_suite/1
+        , end_per_suite/1]).
+-export([ t_rpc/1
+        , t_mqtt/1
+        , t_mngr/1]).
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
@@ -27,21 +28,21 @@
 
 -define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
 
-all() -> [t_rpc,
-          t_mqtt,
-          t_mngr].
+all() -> [ t_rpc
+         , t_mqtt
+         , t_mngr].
 
 init_per_suite(Config) ->
     case node() of
-        nonode@nohost ->
-            net_kernel:start(['emqx@127.0.0.1', longnames]);
-        _ ->
-            ok
+        nonode@nohost -> net_kernel:start(['emqx@127.0.0.1', longnames]);
+        _ -> ok
     end,
-    emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]).
+    emqx_ct_helpers:start_apps([]),
+    emqx_logger:set_log_level(error),
+    [{log_level, error} | Config].
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 t_mngr(Config) when is_list(Config) ->
     Subs = [{<<"a">>, 1}, {<<"b">>, 2}],
@@ -50,8 +51,7 @@ t_mngr(Config) when is_list(Config) ->
             connect_module => emqx_bridge_rpc,
             mountpoint => <<"forwarded">>,
             subscriptions => Subs,
-            start_type => auto
-           },
+            start_type => auto},
     Name = ?FUNCTION_NAME,
     {ok, Pid} = emqx_bridge:start_link(Name, Cfg),
     try
@@ -77,8 +77,7 @@ t_rpc(Config) when is_list(Config) ->
             forwards => [<<"t_rpc/#">>],
             connect_module => emqx_bridge_rpc,
             mountpoint => <<"forwarded">>,
-            start_type => auto
-           },
+            start_type => auto},
     {ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg),
     ClientId = <<"ClientId">>,
     try
@@ -132,8 +131,7 @@ t_mqtt(Config) when is_list(Config) ->
             %% Consume back to forwarded message for verification
             %% NOTE: this is a indefenite loopback without mocking emqx_bridge:import_batch/2
             subscriptions => [{ForwardedTopic, _QoS = 1}],
-            start_type => auto
-           },
+            start_type => auto},
     Tester = self(),
     Ref = make_ref(),
     meck:new(emqx_bridge, [passthrough, no_history]),
@@ -156,14 +154,14 @@ t_mqtt(Config) when is_list(Config) ->
         Max = 100,
         Msgs = lists:seq(1, Max),
         lists:foreach(fun(I) ->
-                              Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)),
-                              emqx_session:publish(SPid, I, Msg)
+                          Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)),
+                          emqx_session:publish(SPid, I, Msg)
                       end, Msgs),
         ok = receive_and_match_messages(Ref, Msgs),
         Msgs2 = lists:seq(Max + 1, Max * 2),
         lists:foreach(fun(I) ->
-                              Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)),
-                              emqx_session:publish(SPid, I, Msg)
+                          Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)),
+                          emqx_session:publish(SPid, I, Msg)
                       end, Msgs2),
         ok = receive_and_match_messages(Ref, Msgs2),
         emqx_mock_client:close_session(ConnPid)

+ 4 - 6
test/emqx_broker_SUITE.erl

@@ -32,23 +32,21 @@ all() ->
      {group, stats}].
 
 groups() ->
-    [
-     {pubsub, [sequence], [subscribe_unsubscribe,
+    [{pubsub, [sequence], [subscribe_unsubscribe,
                            publish, pubsub,
                            t_shared_subscribe,
                            dispatch_with_no_sub,
                            'pubsub#', 'pubsub+']},
      {session, [sequence], [start_session]},
      {metrics, [sequence], [inc_dec_metric]},
-     {stats, [sequence], [set_get_stat]}
-    ].
+     {stats, [sequence], [set_get_stat]}].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 %%--------------------------------------------------------------------
 %% PubSub Test

+ 2 - 2
test/emqx_client_SUITE.erl

@@ -46,11 +46,11 @@ groups() ->
        dollar_topics_test]}].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 receive_messages(Count) ->
     receive_messages(Count, []).

+ 2 - 2
test/emqx_cm_SUITE.erl

@@ -31,11 +31,11 @@ groups() ->
        t_lookup_conn_pid]}].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 init_per_testcase(_TestCase, Config) ->
     register_connection(),

+ 2 - 2
test/emqx_connection_SUITE.erl

@@ -27,11 +27,11 @@ all() ->
     [t_connect_api].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 t_connect_api(_Config) ->
     {ok, T1} = emqx_client:start_link([{host, "localhost"},

+ 0 - 199
test/emqx_ct_broker_helpers.erl

@@ -1,199 +0,0 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-
--module(emqx_ct_broker_helpers).
-
--compile(export_all).
--compile(nowarn_export_all).
-
--define(APP, emqx).
-
--define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"},
-                          {verify, verify_peer},
-                          {fail_if_no_peer_cert, true}]).
-
--define(MQTT_SSL_CLIENT, [{keyfile, "certs/client-key.pem"},
-                          {cacertfile, "certs/cacert.pem"},
-                          {certfile, "certs/client-cert.pem"}]).
-
--define(CIPHERS,    [{ciphers,
-                        ["ECDHE-ECDSA-AES256-GCM-SHA384",
-                         "ECDHE-RSA-AES256-GCM-SHA384",
-                         "ECDHE-ECDSA-AES256-SHA384",
-                         "ECDHE-RSA-AES256-SHA384","ECDHE-ECDSA-DES-CBC3-SHA",
-                         "ECDH-ECDSA-AES256-GCM-SHA384",
-                         "ECDH-RSA-AES256-GCM-SHA384",
-                         "ECDH-ECDSA-AES256-SHA384","ECDH-RSA-AES256-SHA384",
-                         "DHE-DSS-AES256-GCM-SHA384","DHE-DSS-AES256-SHA256",
-                         "AES256-GCM-SHA384","AES256-SHA256",
-                         "ECDHE-ECDSA-AES128-GCM-SHA256",
-                         "ECDHE-RSA-AES128-GCM-SHA256",
-                         "ECDHE-ECDSA-AES128-SHA256",
-                         "ECDHE-RSA-AES128-SHA256",
-                         "ECDH-ECDSA-AES128-GCM-SHA256",
-                         "ECDH-RSA-AES128-GCM-SHA256",
-                         "ECDH-ECDSA-AES128-SHA256","ECDH-RSA-AES128-SHA256",
-                         "DHE-DSS-AES128-GCM-SHA256","DHE-DSS-AES128-SHA256",
-                         "AES128-GCM-SHA256","AES128-SHA256",
-                         "ECDHE-ECDSA-AES256-SHA","ECDHE-RSA-AES256-SHA",
-                         "DHE-DSS-AES256-SHA","ECDH-ECDSA-AES256-SHA",
-                         "ECDH-RSA-AES256-SHA","AES256-SHA",
-                         "ECDHE-ECDSA-AES128-SHA","ECDHE-RSA-AES128-SHA",
-                         "DHE-DSS-AES128-SHA","ECDH-ECDSA-AES128-SHA",
-                         "ECDH-RSA-AES128-SHA","AES128-SHA"]}]).
-
-run_setup_steps() ->
-    _ = run_setup_steps([]),
-    %% return ok to be backward compatible
-    ok.
-
-run_setup_steps(Config) ->
-    NewConfig = generate_config(),
-    lists:foreach(fun set_app_env/1, NewConfig),
-    set_bridge_env(),
-
-    {ok, _} = application:ensure_all_started(?APP),
-    set_log_level(Config),
-    Config.
-
-run_teardown_steps() ->
-    ?APP:shutdown().
-
-generate_config() ->
-    Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]),
-    Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]),
-    cuttlefish_generator:map(Schema, Conf).
-
-set_log_level(Config) ->
-    case proplists:get_value(log_level, Config) of
-        undefined -> ok;
-        Level -> emqx_logger:set_log_level(Level)
-    end.
-
-get_base_dir(Module) ->
-    {file, Here} = code:is_loaded(Module),
-    filename:dirname(filename:dirname(Here)).
-
-get_base_dir() ->
-    get_base_dir(?MODULE).
-
-local_path(Components, Module) ->
-    filename:join([get_base_dir(Module) | Components]).
-
-local_path(Components) ->
-    local_path(Components, ?MODULE).
-
-set_app_env({App, Lists}) ->
-    lists:foreach(fun({acl_file, _Var}) ->
-                      application:set_env(App, acl_file, local_path(["etc", "acl.conf"]));
-                     ({plugins_loaded_file, _Var}) ->
-                      application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"]));
-                     ({Par, Var}) ->
-                      application:set_env(App, Par, Var)
-                  end, Lists).
-
-set_bridge_env() ->
-    BridgeEnvs = bridge_conf(),
-    application:set_env(?APP, bridges, BridgeEnvs).
-
-change_opts(SslType) ->
-    {ok, Listeners} = application:get_env(?APP, listeners),
-    NewListeners =
-        lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) ->
-                            case Protocol of
-                                ssl ->
-                                    SslOpts = proplists:get_value(ssl_options, Opts),
-                                    Keyfile = local_path(["etc/certs", "key.pem"]),
-                                    Certfile = local_path(["etc/certs", "cert.pem"]),
-                                    TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}),
-                                    TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}),
-                                    TupleList3 =
-                                        case SslType of
-                                            ssl_twoway->
-                                                CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]),
-                                                MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}),
-                                                lists:merge(TupleList2, MutSslList);
-                                            _ ->
-                                                lists:filter(fun ({cacertfile, _}) -> false;
-                                                                 ({verify, _}) -> false;
-                                                                 ({fail_if_no_peer_cert, _}) -> false;
-                                                                 (_) -> true
-                                                             end, TupleList2)
-                                        end,
-                                    [{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc];
-                                _ ->
-                                    [Listener | Acc]
-                            end
-                    end, [], Listeners),
-    application:set_env(?APP, listeners, NewListeners).
-
-client_ssl_twoway() ->
-    [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT] ++ ?CIPHERS.
-
-client_ssl() ->
-    ?CIPHERS ++ [{reuse_sessions, true}].
-
-wait_mqtt_payload(Payload) ->
-    receive
-        {publish, #{payload := Payload}} ->
-            ct:pal("OK - received msg: ~p~n", [Payload])
-    after 1000 ->
-        ct:fail({timeout, Payload, {msg_box, flush()}})
-    end.
-
-not_wait_mqtt_payload(Payload) ->
-    receive
-        {publish, #{payload := Payload}} ->
-            ct:fail({received, Payload})
-    after 1000 ->
-        ct:pal("OK - msg ~p is not received", [Payload])
-    end.
-
-flush() ->
-    flush([]).
-flush(Msgs) ->
-    receive
-        M -> flush([M|Msgs])
-    after
-        0 -> lists:reverse(Msgs)
-    end.
-
-bridge_conf() ->
-    [ {local_rpc,
-        [{connect_module, emqx_bridge_rpc},
-         {address, node()},
-         {forwards, ["bridge-1/#", "bridge-2/#"]}
-        ]}
-    ].
-    % [{aws,
-    %   [{connect_module, emqx_bridge_mqtt},
-    %   {username,"user"},
-    %    {address,"127.0.0.1:1883"},
-    %    {clean_start,true},
-    %    {client_id,"bridge_aws"},
-    %    {forwards,["topic1/#","topic2/#"]},
-    %    {keepalive,60000},
-    %    {max_inflight,32},
-    %    {mountpoint,"bridge/aws/${node}/"},
-    %    {password,"passwd"},
-    %    {proto_ver,mqttv4},
-    %    {queue,
-    %     #{batch_coun t_limit => 1000,
-    %       replayq_dir => "data/emqx_aws_bridge/",
-    %       replayq_seg_bytes => 10485760}},
-    %    {reconnect_delay_ms,30000},
-    %    {ssl,false},
-    %    {ssl_opts,[{versions,[tlsv1,'tlsv1.1','tlsv1.2']}]},
-    %    {start_type,manual},
-    %    {subscriptions,[{"cmd/topic1",1},{"cmd/topic2",1}]}]}].

+ 0 - 68
test/emqx_ct_helpers.erl

@@ -1,68 +0,0 @@
-%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-
--module(emqx_ct_helpers).
-
--export([ensure_mnesia_stopped/0, wait_for/4]).
-
-ensure_mnesia_stopped() ->
-    ekka_mnesia:ensure_stopped(),
-    ekka_mnesia:delete_schema().
-
-%% Help function to wait for Fun to yield 'true'.
-wait_for(Fn, Ln, F, Timeout) ->
-    {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
-    wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
-
-wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) ->
-    receive
-        {'DOWN', Mref, process, Pid, normal} ->
-            ok;
-        {'DOWN', Mref, process, Pid, {unexpected, Result}} ->
-            erlang:error({unexpected, Fn, Ln, Result});
-        {'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} ->
-            erlang:raise(C, {Fn, Ln, E}, S)
-    after
-        Timeout ->
-            case Kill of
-                true ->
-                    erlang:demonitor(Mref, [flush]),
-                    erlang:exit(Pid, kill),
-                    erlang:error({Fn, Ln, timeout});
-                false ->
-                    Pid ! stop,
-                    wait_for_down(Fn, Ln, Timeout, Pid, Mref, true)
-            end
-    end.
-
-wait_loop(_F, ok) -> exit(normal);
-wait_loop(F, LastRes) ->
-    receive
-        stop -> erlang:exit(LastRes)
-    after
-        100 ->
-            Res = catch_call(F),
-            wait_loop(F, Res)
-    end.
-
-catch_call(F) ->
-    try
-        case F() of
-            true -> ok;
-            Other -> {unexpected, Other}
-        end
-    catch
-        C : E : S ->
-            {crashed, {C, E, S}}
-    end.

+ 2 - 2
test/emqx_flapping_SUITE.erl

@@ -26,12 +26,12 @@ all() ->
     [t_flapping].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     prepare_for_test(),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 t_flapping(_Config) ->
     process_flag(trap_exit, true),

+ 12 - 43
test/emqx_protocol_SUITE.erl

@@ -27,24 +27,21 @@
                  <<"/TopicA">>]).
 
 -define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
-                                client_id = <<"mqtt_client">>,
-                                username  = <<"emqx">>,
-                                password  = <<"public">>})).
+                                   client_id = <<"mqtt_client">>,
+                                   username  = <<"emqx">>,
+                                   password  = <<"public">>})).
 
 all() ->
-    [
-     {group, mqtt_common},
+    [{group, mqtt_common},
      {group, mqttv4},
      {group, mqttv5},
      {group, acl},
-     {group, frame_partial}
-    ].
+     {group, frame_partial}].
 
 groups() ->
     [{mqtt_common, [sequence],
       [will_topic_check,
-       will_acl_check
-       ]},
+       will_acl_check]},
      {mqttv4, [sequence],
       [connect_v4,
        subscribe_v4]},
@@ -54,18 +51,15 @@ groups() ->
      {acl, [sequence],
       [acl_deny_action_ct]},
      {frame_partial, [sequence],
-       [handle_followed_packet]}].
+      [handle_followed_packet]}].
 
 init_per_suite(Config) ->
-    [start_apps(App, SchemaFile, ConfigFile) ||
-        {App, SchemaFile, ConfigFile}
-            <- [{emqx, deps_path(emqx, "priv/emqx.schema"),
-                       deps_path(emqx, "etc/gen.emqx.conf")}]],
+    emqx_ct_helpers:start_apps([], fun set_special_configs/1),
     emqx_zone:set_env(external, max_topic_alias, 20),
     Config.
 
 end_per_suite(_Config) ->
-    application:stop(emqx).
+    emqx_ct_helpers:stop_apps([]).
 
 batch_connect(NumberOfConnections) ->
     batch_connect([], NumberOfConnections).
@@ -567,7 +561,7 @@ will_topic_check(_) ->
     emqx_client:stop(Client),
     ct:sleep(100),
     false = is_process_alive(Client),
-    emqx_ct_broker_helpers:wait_mqtt_payload(<<"I have died">>),
+    emqx_ct_helpers:wait_mqtt_payload(<<"I have died">>),
     emqx_client:stop(T).
 
 will_acl_check(_) ->
@@ -613,37 +607,12 @@ acl_deny_do_disconnect(subscribe, QoS, Topic) ->
     after 1000 -> ct:fail({timeout, wait_tcp_closed})
     end.
 
-start_apps(App, SchemaFile, ConfigFile) ->
-    read_schema_configs(App, SchemaFile, ConfigFile),
-    set_special_configs(App),
-    application:ensure_all_started(App).
-
-read_schema_configs(App, SchemaFile, ConfigFile) ->
-    Schema = cuttlefish_schema:files([SchemaFile]),
-    Conf = conf_parse:file(ConfigFile),
-    NewConfig = cuttlefish_generator:map(Schema, Conf),
-    Vals = proplists:get_value(App, NewConfig, []),
-    [application:set_env(App, Par, Value) || {Par, Value} <- Vals].
-
 set_special_configs(emqx) ->
     application:set_env(emqx, enable_acl_cache, false),
     application:set_env(emqx, plugins_loaded_file,
-                        deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")),
+                        emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")),
     application:set_env(emqx, acl_deny_action, disconnect),
     application:set_env(emqx, acl_file,
-                        deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
+                        emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
 set_special_configs(_App) ->
     ok.
-
-deps_path(App, RelativePath) ->
-    %% Note: not lib_dir because etc dir is not sym-link-ed to _build dir
-    %% but priv dir is
-    Path0 = code:priv_dir(App),
-    Path = case file:read_link(Path0) of
-               {ok, Resolved} -> Resolved;
-               {error, _} -> Path0
-           end,
-    filename:join([Path, "..", RelativePath]).
-
-local_path(RelativePath) ->
-    deps_path(emqx_auth_username, RelativePath).

+ 2 - 3
test/emqx_router_SUITE.erl

@@ -36,11 +36,11 @@ groups() ->
        t_unexpected]}].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 init_per_testcase(_TestCase, Config) ->
     clear_tables(),
@@ -106,4 +106,3 @@ t_unexpected(_) ->
 
 clear_tables() ->
     lists:foreach(fun mnesia:clear_table/1, [emqx_route, emqx_trie, emqx_trie_node]).
-

+ 3 - 3
test/emqx_rpc_SUITE.erl

@@ -24,13 +24,13 @@
 all() -> [t_rpc].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 t_rpc(_) ->
     60000 = emqx_rpc:call(?MASTER, timer, seconds, [60]),
     {badrpc, _} = emqx_rpc:call(?MASTER, os, test, []),
-    {_, []} = emqx_rpc:multicall([?MASTER, ?MASTER], os, timestamp, []).
+    {_, []} = emqx_rpc:multicall([?MASTER, ?MASTER], os, timestamp, []).

+ 2 - 2
test/emqx_session_SUITE.erl

@@ -24,11 +24,11 @@
 all() -> [ignore_loop, t_session_all].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 ignore_loop(_Config) ->
     application:set_env(emqx, mqtt_ignore_loop_deliver, true),

+ 3 - 5
test/emqx_shared_sub_SUITE.erl

@@ -37,15 +37,14 @@ all() -> [t_random_basic,
           t_sticky,
           t_hash,
           t_not_so_sticky,
-          t_no_connection_nack
-         ].
+          t_no_connection_nack].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 t_random_basic(_) ->
     ok = ensure_config(random),
@@ -258,4 +257,3 @@ ensure_config(Strategy, AckEnabled) ->
 
 subscribed(Group, Topic, Pid) ->
     lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
-

+ 2 - 2
test/emqx_sm_SUITE.erl

@@ -43,11 +43,11 @@ groups() ->
        t_lookup_session_pids]}].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 init_per_testcase(_All, Config) ->
     {ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => self()}),

+ 2 - 3
test/emqx_sys_mon_SUITE.erl

@@ -34,11 +34,11 @@
 all() -> [t_sys_mon].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 t_sys_mon(_Config) ->
     lists:foreach(fun({PidOrPort, SysMonName,ValidateInfo, InfoOrPort}) ->
@@ -64,4 +64,3 @@ validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) ->
 concat_str(ValidateInfo, InfoOrPort, Info) ->
     WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]),
     lists:flatten(WarnInfo).
-

+ 2 - 2
test/emqx_tracer_SUITE.erl

@@ -24,11 +24,11 @@
 all() -> [start_traces].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 start_traces(_Config) ->
     {ok, T} = emqx_client:start_link([{host, "localhost"},

+ 2 - 2
test/emqx_ws_connection_SUITE.erl

@@ -39,11 +39,11 @@ all() ->
     [t_ws_connect_api].
 
 init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
+    emqx_ct_helpers:start_apps([]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
+    emqx_ct_helpers:stop_apps([]).
 
 t_ws_connect_api(_Config) ->
     WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),