Просмотр исходного кода

test(cluster-link): better isolate API test groups

Andrew Mayorov 1 год назад
Родитель
Сommit
7c52c3d2fe

+ 7 - 22
apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl

@@ -157,9 +157,9 @@ t_message_forwarding(Config) ->
     [SourceNode1 | _] = nodes_source(Config),
     [SourceNode1 | _] = nodes_source(Config),
     [TargetNode1, TargetNode2 | _] = nodes_target(Config),
     [TargetNode1, TargetNode2 | _] = nodes_target(Config),
 
 
-    SourceC1 = start_client("t_message_forwarding", SourceNode1),
-    TargetC1 = start_client("t_message_forwarding1", TargetNode1),
-    TargetC2 = start_client("t_message_forwarding2", TargetNode2),
+    SourceC1 = emqx_cluster_link_cth:connect_client("t_message_forwarding", SourceNode1),
+    TargetC1 = emqx_cluster_link_cth:connect_client("t_message_forwarding1", TargetNode1),
+    TargetC2 = emqx_cluster_link_cth:connect_client("t_message_forwarding2", TargetNode2),
     IsShared = ?config(is_shared_sub, Config),
     IsShared = ?config(is_shared_sub, Config),
 
 
     {ok, _, _} = emqtt:subscribe(TargetC1, maybe_shared_topic(IsShared, <<"t/+">>), qos1),
     {ok, _, _} = emqtt:subscribe(TargetC1, maybe_shared_topic(IsShared, <<"t/+">>), qos1),
@@ -198,9 +198,9 @@ t_target_extrouting_gc('end', Config) ->
 t_target_extrouting_gc(Config) ->
 t_target_extrouting_gc(Config) ->
     [SourceNode1 | _] = nodes_source(Config),
     [SourceNode1 | _] = nodes_source(Config),
     [TargetNode1, TargetNode2 | _] = nodes_target(Config),
     [TargetNode1, TargetNode2 | _] = nodes_target(Config),
-    SourceC1 = start_client("t_target_extrouting_gc", SourceNode1),
-    TargetC1 = start_client_unlink("t_target_extrouting_gc1", TargetNode1),
-    TargetC2 = start_client_unlink("t_target_extrouting_gc2", TargetNode2),
+    SourceC1 = emqx_cluster_link_cth:connect_client("t_target_extrouting_gc", SourceNode1),
+    TargetC1 = emqx_cluster_link_cth:connect_client_unlink("t_target_extrouting_gc1", TargetNode1),
+    TargetC2 = emqx_cluster_link_cth:connect_client_unlink("t_target_extrouting_gc2", TargetNode2),
     IsShared = ?config(is_shared_sub, Config),
     IsShared = ?config(is_shared_sub, Config),
 
 
     TopicFilter1 = <<"t/+">>,
     TopicFilter1 = <<"t/+">>,
@@ -290,7 +290,7 @@ t_disconnect_on_errors(Config) ->
     ct:timetrap({seconds, 20}),
     ct:timetrap({seconds, 20}),
     [SN1 | _] = nodes_source(Config),
     [SN1 | _] = nodes_source(Config),
     [TargetNode] = nodes_target(Config),
     [TargetNode] = nodes_target(Config),
-    SC1 = start_client("t_disconnect_on_errors", SN1),
+    SC1 = emqx_cluster_link_cth:connect_client("t_disconnect_on_errors", SN1),
     ok = ?ON(SN1, meck:new(emqx_cluster_link, [passthrough, no_link, no_history])),
     ok = ?ON(SN1, meck:new(emqx_cluster_link, [passthrough, no_link, no_history])),
     ?assertMatch(
     ?assertMatch(
         {_, {ok, _}},
         {_, {ok, _}},
@@ -322,20 +322,5 @@ maybe_shared_topic(true = _IsShared, Topic) ->
 maybe_shared_topic(false = _IsShared, Topic) ->
 maybe_shared_topic(false = _IsShared, Topic) ->
     Topic.
     Topic.
 
 
-start_client_unlink(ClientId, Node) ->
-    Client = start_client(ClientId, Node),
-    _ = erlang:unlink(Client),
-    Client.
-
-start_client(ClientId, Node) ->
-    Port = tcp_port(Node),
-    {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
-    {ok, _} = emqtt:connect(Client),
-    Client.
-
-tcp_port(Node) ->
-    {_Host, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
-    Port.
-
 fmt(Fmt, Args) ->
 fmt(Fmt, Args) ->
     emqx_utils:format(Fmt, Args).
     emqx_utils:format(Fmt, Args).

+ 159 - 149
apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl

@@ -14,9 +14,6 @@
 
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 
--define(API_PATH, emqx_mgmt_api_test_util:api_path(["cluster", "links"])).
--define(CONF_PATH, [cluster, links]).
-
 -define(CACERT, <<
 -define(CACERT, <<
     "-----BEGIN CERTIFICATE-----\n"
     "-----BEGIN CERTIFICATE-----\n"
     "MIIDUTCCAjmgAwIBAgIJAPPYCjTmxdt/MA0GCSqGSIb3DQEBCwUAMD8xCzAJBgNV\n"
     "MIIDUTCCAjmgAwIBAgIJAPPYCjTmxdt/MA0GCSqGSIb3DQEBCwUAMD8xCzAJBgNV\n"
@@ -48,15 +45,16 @@
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
 all() ->
 all() ->
-    AllTCs = emqx_common_test_helpers:all(?MODULE),
-    OtherTCs = AllTCs -- cluster_test_cases(),
+    [{group, cluster}, {group, local}].
+
+groups() ->
     [
     [
-        {group, cluster}
-        | OtherTCs
+        {cluster, cluster_test_cases()},
+        {local, local_test_cases()}
     ].
     ].
 
 
-groups() ->
-    [{cluster, cluster_test_cases()}].
+local_test_cases() ->
+    emqx_common_test_helpers:all(?MODULE) -- cluster_test_cases().
 
 
 cluster_test_cases() ->
 cluster_test_cases() ->
     [
     [
@@ -68,6 +66,12 @@ cluster_test_cases() ->
 init_per_suite(Config) ->
 init_per_suite(Config) ->
     %% This is called by emqx_machine in EMQX release
     %% This is called by emqx_machine in EMQX release
     emqx_otel_app:configure_otel_deps(),
     emqx_otel_app:configure_otel_deps(),
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
+
+init_per_group(local = _Group, Config) ->
     Apps = emqx_cth_suite:start(
     Apps = emqx_cth_suite:start(
         [
         [
             emqx_conf,
             emqx_conf,
@@ -77,14 +81,8 @@ init_per_suite(Config) ->
         ],
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
     ),
-    Auth = auth_header(),
-    [{suite_apps, Apps}, {auth, Auth} | Config].
-
-end_per_suite(Config) ->
-    emqx_cth_suite:stop(?config(suite_apps, Config)),
-    emqx_config:delete_override_conf_files(),
-    ok.
-
+    Auth = emqx_mgmt_api_test_util:auth_header_(),
+    [{suite_apps, Apps}, {auth, Auth} | Config];
 init_per_group(cluster = Group, Config) ->
 init_per_group(cluster = Group, Config) ->
     ok = emqx_cth_suite:stop_apps([emqx_dashboard]),
     ok = emqx_cth_suite:stop_apps([emqx_dashboard]),
     SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(Group, Config),
     SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(Group, Config),
@@ -105,33 +103,31 @@ init_per_group(cluster = Group, Config) ->
         ],
         ],
         #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
         #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
     ]),
     ]),
+    Auth = ?ON(SN1, emqx_mgmt_api_test_util:auth_header_()),
     [
     [
         {source_nodes, SourceNodes},
         {source_nodes, SourceNodes},
-        {target_nodes, TargetNodes}
+        {target_nodes, TargetNodes},
+        {auth, Auth}
         | Config
         | Config
-    ];
-init_per_group(_Group, Config) ->
-    Config.
+    ].
 
 
+end_per_group(local, Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config));
 end_per_group(cluster, Config) ->
 end_per_group(cluster, Config) ->
     SourceNodes = ?config(source_nodes, Config),
     SourceNodes = ?config(source_nodes, Config),
     TargetNodes = ?config(target_nodes, Config),
     TargetNodes = ?config(target_nodes, Config),
     ok = emqx_cth_cluster:stop(SourceNodes),
     ok = emqx_cth_cluster:stop(SourceNodes),
-    ok = emqx_cth_cluster:stop(TargetNodes),
-    _ = emqx_cth_suite:start_apps(
-        [emqx_mgmt_api_test_util:emqx_dashboard()],
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
-    ),
-    ok;
-end_per_group(_Group, _Config) ->
-    ok.
+    ok = emqx_cth_cluster:stop(TargetNodes).
 
 
-auth_header() ->
-    emqx_mgmt_api_test_util:auth_header_().
+init_per_testcase(TC, Config) ->
+    [Group] = [G || {G, TCs} <- groups(), lists:member(TC, TCs)],
+    snabbkaffe:start_trace(),
+    init_per_testcase(TC, Group, Config).
 
 
-init_per_testcase(_TC, Config) ->
+init_per_testcase(_TC, local, Config) ->
     {ok, _} = emqx_cluster_link_config:update([]),
     {ok, _} = emqx_cluster_link_config:update([]),
-    snabbkaffe:start_trace(),
+    Config;
+init_per_testcase(_TC, cluster, Config) ->
     Config.
     Config.
 
 
 end_per_testcase(_TC, _Config) ->
 end_per_testcase(_TC, _Config) ->
@@ -146,47 +142,52 @@ end_per_testcase(_TC, _Config) ->
 api_root() ->
 api_root() ->
     <<"cluster/links">>.
     <<"cluster/links">>.
 
 
-list() ->
-    Path = emqx_mgmt_api_test_util:api_path([api_root()]),
-    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "").
+api_path(ReqPath) ->
+    emqx_mgmt_api_test_util:api_path([api_root() | ReqPath]).
+
+api_path(Host, ReqPath) ->
+    emqx_mgmt_api_test_util:api_path(Host, [api_root() | ReqPath]).
+
+api_auth(Config) ->
+    ?config(auth, Config).
 
 
-get_link(Name) ->
-    get_link(source, Name).
+list(Config) ->
+    Path = api_path([]),
+    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "", api_auth(Config)).
 
 
-get_link(SourceOrTargetCluster, Name) ->
-    Host = host(SourceOrTargetCluster),
-    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name]),
-    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "").
+get_link(Name, Config) ->
+    get_link(source, Name, Config).
 
 
-delete_link(Name) ->
-    Path = emqx_mgmt_api_test_util:api_path([api_root(), "link", Name]),
-    emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = "").
+get_link(SourceOrTargetCluster, Name, Config) ->
+    Path = api_path(host(SourceOrTargetCluster), ["link", Name]),
+    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "", api_auth(Config)).
 
 
-update_link(Name, Params) ->
-    update_link(source, Name, Params).
+delete_link(Name, Config) ->
+    Path = api_path(["link", Name]),
+    emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = "", api_auth(Config)).
 
 
-update_link(SourceOrTargetCluster, Name, Params) ->
-    Host = host(SourceOrTargetCluster),
-    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name]),
-    emqx_mgmt_api_test_util:simple_request(put, Path, Params).
+update_link(Name, Params, Config) ->
+    update_link(source, Name, Params, Config).
 
 
-create_link(Name, Params0) ->
+update_link(SourceOrTargetCluster, Name, Params, Config) ->
+    Path = api_path(host(SourceOrTargetCluster), ["link", Name]),
+    emqx_mgmt_api_test_util:simple_request(put, Path, Params, api_auth(Config)).
+
+create_link(Name, Params0, Config) ->
+    Path = api_path([]),
     Params = Params0#{<<"name">> => Name},
     Params = Params0#{<<"name">> => Name},
-    Path = emqx_mgmt_api_test_util:api_path([api_root()]),
-    emqx_mgmt_api_test_util:simple_request(post, Path, Params).
+    emqx_mgmt_api_test_util:simple_request(post, Path, Params, api_auth(Config)).
 
 
-get_metrics(Name) ->
-    get_metrics(source, Name).
+get_metrics(Name, Config) ->
+    get_metrics(source, Name, Config).
 
 
-get_metrics(SourceOrTargetCluster, Name) ->
-    Host = host(SourceOrTargetCluster),
-    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name, "metrics"]),
-    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = []).
+get_metrics(SourceOrTargetCluster, Name, Config) ->
+    Path = api_path(host(SourceOrTargetCluster), ["link", Name, "metrics"]),
+    emqx_mgmt_api_test_util:simple_request(get, Path, _Params = [], api_auth(Config)).
 
 
-reset_metrics(SourceOrTargetCluster, Name) ->
-    Host = host(SourceOrTargetCluster),
-    Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name, "metrics", "reset"]),
-    emqx_mgmt_api_test_util:simple_request(put, Path, _Params = []).
+reset_metrics(SourceOrTargetCluster, Name, Config) ->
+    Path = api_path(host(SourceOrTargetCluster), ["link", Name, "metrics", "reset"]),
+    emqx_mgmt_api_test_util:simple_request(put, Path, _Params = [], api_auth(Config)).
 
 
 host(source) -> "http://127.0.0.1:18083";
 host(source) -> "http://127.0.0.1:18083";
 host(target) -> "http://127.0.0.1:28083".
 host(target) -> "http://127.0.0.1:28083".
@@ -220,7 +221,7 @@ disable_and_force_gc(TargetOrSource, Name, Params, TCConfig, Opts) ->
             target -> ?config(source_nodes, TCConfig);
             target -> ?config(source_nodes, TCConfig);
             source -> ?config(target_nodes, TCConfig)
             source -> ?config(target_nodes, TCConfig)
         end,
         end,
-    {200, _} = update_link(TargetOrSource, Name, Params#{<<"enable">> := false}),
+    {200, _} = update_link(TargetOrSource, Name, Params#{<<"enable">> := false}, TCConfig),
     %% Note that only when the GC runs and collects the stopped actor it'll actually
     %% Note that only when the GC runs and collects the stopped actor it'll actually
     %% remove the routes
     %% remove the routes
     NowMS = erlang:system_time(millisecond),
     NowMS = erlang:system_time(millisecond),
@@ -252,8 +253,8 @@ wait_for_routes([], _ExpectedTopics) ->
 %% Test cases
 %% Test cases
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
-t_put_get_valid(_Config) ->
-    ?assertMatch({200, []}, list()),
+t_put_get_valid(Config) ->
+    ?assertMatch({200, []}, list(Config)),
 
 
     Name1 = <<"emqcl_1">>,
     Name1 = <<"emqcl_1">>,
     Link1 = link_params(#{
     Link1 = link_params(#{
@@ -265,42 +266,42 @@ t_put_get_valid(_Config) ->
         <<"server">> => <<"emqxcl_2.nohost:41883">>,
         <<"server">> => <<"emqxcl_2.nohost:41883">>,
         <<"name">> => Name2
         <<"name">> => Name2
     }),
     }),
-    ?assertMatch({201, _}, create_link(Name1, Link1)),
-    ?assertMatch({201, _}, create_link(Name2, Link2)),
-    ?assertMatch({200, [_, _]}, list()),
+    ?assertMatch({201, _}, create_link(Name1, Link1, Config)),
+    ?assertMatch({201, _}, create_link(Name2, Link2, Config)),
+    ?assertMatch({200, [_, _]}, list(Config)),
 
 
     DisabledLink1 = Link1#{<<"enable">> => false},
     DisabledLink1 = Link1#{<<"enable">> => false},
-    ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, DisabledLink1))),
-    ?assertMatch({200, #{<<"enable">> := false}}, get_link(Name1)),
-    ?assertMatch({200, #{<<"enable">> := true}}, get_link(Name2)),
+    ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, DisabledLink1), Config)),
+    ?assertMatch({200, #{<<"enable">> := false}}, get_link(Name1, Config)),
+    ?assertMatch({200, #{<<"enable">> := true}}, get_link(Name2, Config)),
 
 
     SSL = #{<<"enable">> => true, <<"cacertfile">> => ?CACERT},
     SSL = #{<<"enable">> => true, <<"cacertfile">> => ?CACERT},
     SSLLink1 = Link1#{<<"ssl">> => SSL},
     SSLLink1 = Link1#{<<"ssl">> => SSL},
-    ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, SSLLink1))),
+    ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, SSLLink1), Config)),
     ?assertMatch(
     ?assertMatch(
         {200, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}},
         {200, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}},
-        get_link(Name1)
+        get_link(Name1, Config)
     ),
     ),
     ok.
     ok.
 
 
-t_put_invalid(_Config) ->
+t_put_invalid(Config) ->
     Name = <<"l1">>,
     Name = <<"l1">>,
-    {201, _} = create_link(Name, link_params()),
+    {201, _} = create_link(Name, link_params(), Config),
     ?assertMatch(
     ?assertMatch(
         {400, _},
         {400, _},
-        update_link(Name, maps:remove(<<"server">>, link_params()))
+        update_link(Name, maps:remove(<<"server">>, link_params()), Config)
     ).
     ).
 
 
 %% Tests a sequence of CRUD operations and their expected responses, for common use cases
 %% Tests a sequence of CRUD operations and their expected responses, for common use cases
 %% and configuration states.
 %% and configuration states.
-t_crud(_Config) ->
+t_crud(Config) ->
     %% No links initially.
     %% No links initially.
-    ?assertMatch({200, []}, list()),
+    ?assertMatch({200, []}, list(Config)),
     NameA = <<"a">>,
     NameA = <<"a">>,
-    ?assertMatch({404, _}, get_link(NameA)),
-    ?assertMatch({404, _}, delete_link(NameA)),
-    ?assertMatch({404, _}, update_link(NameA, link_params())),
-    ?assertMatch({404, _}, get_metrics(NameA)),
+    ?assertMatch({404, _}, get_link(NameA, Config)),
+    ?assertMatch({404, _}, delete_link(NameA, Config)),
+    ?assertMatch({404, _}, update_link(NameA, link_params(), Config)),
+    ?assertMatch({404, _}, get_metrics(NameA, Config)),
 
 
     Params1 = link_params(),
     Params1 = link_params(),
     ?assertMatch(
     ?assertMatch(
@@ -310,9 +311,12 @@ t_crud(_Config) ->
             <<"status">> := _,
             <<"status">> := _,
             <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
             <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
         }},
         }},
-        create_link(NameA, Params1)
+        create_link(NameA, Params1, Config)
+    ),
+    ?assertMatch(
+        {400, #{<<"code">> := <<"ALREADY_EXISTS">>}},
+        create_link(NameA, Params1, Config)
     ),
     ),
-    ?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, create_link(NameA, Params1)),
     ?assertMatch(
     ?assertMatch(
         {200, [
         {200, [
             #{
             #{
@@ -322,7 +326,7 @@ t_crud(_Config) ->
                 <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
                 <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
             }
             }
         ]},
         ]},
-        list()
+        list(Config)
     ),
     ),
     ?assertMatch(
     ?assertMatch(
         {200, #{
         {200, #{
@@ -331,9 +335,9 @@ t_crud(_Config) ->
             <<"status">> := _,
             <<"status">> := _,
             <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
             <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
         }},
         }},
-        get_link(NameA)
+        get_link(NameA, Config)
     ),
     ),
-    ?assertMatch({200, _}, get_metrics(NameA)),
+    ?assertMatch({200, _}, get_metrics(NameA, Config)),
 
 
     Params2 = Params1#{<<"pool_size">> := 2},
     Params2 = Params1#{<<"pool_size">> := 2},
     ?assertMatch(
     ?assertMatch(
@@ -343,32 +347,30 @@ t_crud(_Config) ->
             <<"status">> := _,
             <<"status">> := _,
             <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
             <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
         }},
         }},
-        update_link(NameA, Params2)
+        update_link(NameA, Params2, Config)
     ),
     ),
 
 
-    ?assertMatch({204, _}, delete_link(NameA)),
-    ?assertMatch({404, _}, delete_link(NameA)),
-    ?assertMatch({404, _}, get_link(NameA)),
-    ?assertMatch({404, _}, update_link(NameA, Params1)),
-    ?assertMatch({404, _}, get_metrics(NameA)),
-    ?assertMatch({200, []}, list()),
+    ?assertMatch({204, _}, delete_link(NameA, Config)),
+    ?assertMatch({404, _}, delete_link(NameA, Config)),
+    ?assertMatch({404, _}, get_link(NameA, Config)),
+    ?assertMatch({404, _}, update_link(NameA, Params1, Config)),
+    ?assertMatch({404, _}, get_metrics(NameA, Config)),
+    ?assertMatch({200, []}, list(Config)),
 
 
     ok.
     ok.
 
 
-t_create_invalid(_Config) ->
+t_create_invalid(Config) ->
     Params = link_params(),
     Params = link_params(),
     EmptyName = <<>>,
     EmptyName = <<>>,
-    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message1}} = create_link(
-        EmptyName, Params
-    ),
+    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message1}} =
+        create_link(EmptyName, Params, Config),
     ?assertMatch(
     ?assertMatch(
         #{<<"kind">> := <<"validation_error">>, <<"reason">> := <<"Name cannot be empty string">>},
         #{<<"kind">> := <<"validation_error">>, <<"reason">> := <<"Name cannot be empty string">>},
         Message1
         Message1
     ),
     ),
     LongName = binary:copy(<<$a>>, 256),
     LongName = binary:copy(<<$a>>, 256),
-    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message2}} = create_link(
-        LongName, Params
-    ),
+    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message2}} =
+        create_link(LongName, Params, Config),
     ?assertMatch(
     ?assertMatch(
         #{
         #{
             <<"kind">> := <<"validation_error">>,
             <<"kind">> := <<"validation_error">>,
@@ -377,9 +379,8 @@ t_create_invalid(_Config) ->
         Message2
         Message2
     ),
     ),
     BadName = <<"~!@#$%^&*()_+{}:'<>?|">>,
     BadName = <<"~!@#$%^&*()_+{}:'<>?|">>,
-    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message3}} = create_link(
-        BadName, Params
-    ),
+    {400, #{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := Message3}} =
+        create_link(BadName, Params, Config),
     ?assertMatch(
     ?assertMatch(
         #{
         #{
             <<"kind">> := <<"validation_error">>,
             <<"kind">> := <<"validation_error">>,
@@ -412,7 +413,7 @@ t_status(Config) ->
                     ]
                     ]
                 }
                 }
             ]},
             ]},
-            list()
+            list(Config)
         )
         )
     ),
     ),
     ?assertMatch(
     ?assertMatch(
@@ -429,7 +430,7 @@ t_status(Config) ->
                 }
                 }
             ]
             ]
         }},
         }},
-        get_link(Name)
+        get_link(Name, Config)
     ),
     ),
 
 
     %% If one of the nodes reports a different status, the cluster is inconsistent.
     %% If one of the nodes reports a different status, the cluster is inconsistent.
@@ -465,7 +466,7 @@ t_status(Config) ->
                 ]
                 ]
             }
             }
         ]},
         ]},
-        list()
+        list(Config)
     ),
     ),
     ?assertMatch(
     ?assertMatch(
         {200, #{
         {200, #{
@@ -481,7 +482,7 @@ t_status(Config) ->
                 }
                 }
             ]
             ]
         }},
         }},
-        get_link(Name)
+        get_link(Name, Config)
     ),
     ),
 
 
     %% Simulating erpc failures
     %% Simulating erpc failures
@@ -512,7 +513,7 @@ t_status(Config) ->
                 ]
                 ]
             }
             }
         ]},
         ]},
-        list()
+        list(Config)
     ),
     ),
     ?assertMatch(
     ?assertMatch(
         {200, #{
         {200, #{
@@ -529,7 +530,7 @@ t_status(Config) ->
                 }
                 }
             ]
             ]
         }},
         }},
-        get_link(Name)
+        get_link(Name, Config)
     ),
     ),
     %% Simulate another inconsistency
     %% Simulate another inconsistency
     ?ON(SN1, begin
     ?ON(SN1, begin
@@ -552,7 +553,7 @@ t_status(Config) ->
                 }
                 }
             ]
             ]
         }},
         }},
-        get_link(Name)
+        get_link(Name, Config)
     ),
     ),
 
 
     ok.
     ok.
@@ -627,7 +628,7 @@ t_metrics(Config) ->
                 }
                 }
             ]
             ]
         }},
         }},
-        get_metrics(source, SourceName)
+        get_metrics(source, SourceName, Config)
     ),
     ),
     ?assertMatch(
     ?assertMatch(
         {200, #{
         {200, #{
@@ -643,11 +644,11 @@ t_metrics(Config) ->
                 }
                 }
             ]
             ]
         }},
         }},
-        get_metrics(target, TargetName)
+        get_metrics(target, TargetName, Config)
     ),
     ),
 
 
-    SourceC1 = emqx_cluster_link_SUITE:start_client(<<"sc1">>, SN1),
-    SourceC2 = emqx_cluster_link_SUITE:start_client(<<"sc2">>, SN2),
+    SourceC1 = emqx_cluster_link_cth:connect_client(<<"sc1">>, SN1),
+    SourceC2 = emqx_cluster_link_cth:connect_client(<<"sc2">>, SN2),
     {ok, _, _} = emqtt:subscribe(SourceC1, <<"t/sc1">>),
     {ok, _, _} = emqtt:subscribe(SourceC1, <<"t/sc1">>),
     {ok, _, _} = emqtt:subscribe(SourceC2, <<"t/sc2">>),
     {ok, _, _} = emqtt:subscribe(SourceC2, <<"t/sc2">>),
 
 
@@ -667,7 +668,7 @@ t_metrics(Config) ->
                 }
                 }
             ]
             ]
         }},
         }},
-        get_metrics(source, SourceName)
+        get_metrics(source, SourceName, Config)
     ),
     ),
     ?assertMatch(
     ?assertMatch(
         {200, #{
         {200, #{
@@ -683,11 +684,11 @@ t_metrics(Config) ->
                 }
                 }
             ]
             ]
         }},
         }},
-        get_metrics(target, TargetName)
+        get_metrics(target, TargetName, Config)
     ),
     ),
 
 
-    TargetC1 = emqx_cluster_link_SUITE:start_client(<<"tc1">>, TN1),
-    TargetC2 = emqx_cluster_link_SUITE:start_client(<<"tc2">>, TN2),
+    TargetC1 = emqx_cluster_link_cth:connect_client(<<"tc1">>, TN1),
+    TargetC2 = emqx_cluster_link_cth:connect_client(<<"tc2">>, TN2),
     {_, {ok, _}} =
     {_, {ok, _}} =
         ?wait_async_action(
         ?wait_async_action(
             begin
             begin
@@ -711,7 +712,7 @@ t_metrics(Config) ->
                     #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}}
                     #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}}
                 ]
                 ]
             }},
             }},
-            get_metrics(source, SourceName)
+            get_metrics(source, SourceName, Config)
         )
         )
     ),
     ),
     ?assertMatch(
     ?assertMatch(
@@ -719,7 +720,7 @@ t_metrics(Config) ->
             <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
             <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
             <<"node_metrics">> := _
             <<"node_metrics">> := _
         }},
         }},
-        get_metrics(target, TargetName)
+        get_metrics(target, TargetName, Config)
     ),
     ),
 
 
     %% Unsubscribe and remove route.
     %% Unsubscribe and remove route.
@@ -743,13 +744,13 @@ t_metrics(Config) ->
                     #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}}
                     #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}}
                 ]
                 ]
             }},
             }},
-            get_metrics(source, SourceName)
+            get_metrics(source, SourceName, Config)
         )
         )
     ),
     ),
 
 
     %% Disabling the link should remove the routes.
     %% Disabling the link should remove the routes.
     ct:pal("disabling"),
     ct:pal("disabling"),
-    {200, TargetLink0} = get_link(target, TargetName),
+    {200, TargetLink0} = get_link(target, TargetName, Config),
     TargetLink1 = remove_api_virtual_fields(TargetLink0),
     TargetLink1 = remove_api_virtual_fields(TargetLink0),
     ok = disable_and_force_gc(target, TargetName, TargetLink1, Config, #{
     ok = disable_and_force_gc(target, TargetName, TargetLink1, Config, #{
         expected_num_route_deletions => 1
         expected_num_route_deletions => 1
@@ -763,7 +764,7 @@ t_metrics(Config) ->
                 <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
                 <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}},
                 <<"node_metrics">> := _
                 <<"node_metrics">> := _
             }},
             }},
-            get_metrics(source, SourceName)
+            get_metrics(source, SourceName, Config)
         )
         )
     ),
     ),
 
 
@@ -772,7 +773,7 @@ t_metrics(Config) ->
     {_, {ok, _}} =
     {_, {ok, _}} =
         ?wait_async_action(
         ?wait_async_action(
             begin
             begin
-                {200, _} = update_link(target, TargetName, TargetLink2)
+                {200, _} = update_link(target, TargetName, TargetLink2, Config)
             end,
             end,
             #{?snk_kind := "cluster_link_extrouter_route_added"}
             #{?snk_kind := "cluster_link_extrouter_route_added"}
         ),
         ),
@@ -785,12 +786,12 @@ t_metrics(Config) ->
                 <<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}},
                 <<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}},
                 <<"node_metrics">> := _
                 <<"node_metrics">> := _
             }},
             }},
-            get_metrics(source, SourceName)
+            get_metrics(source, SourceName, Config)
         )
         )
     ),
     ),
 
 
     %% Reset metrics
     %% Reset metrics
-    ?assertMatch({204, _}, reset_metrics(source, SourceName)),
+    ?assertMatch({204, _}, reset_metrics(source, SourceName, Config)),
     ?assertMatch(
     ?assertMatch(
         {200, #{
         {200, #{
             <<"metrics">> := #{
             <<"metrics">> := #{
@@ -814,26 +815,29 @@ t_metrics(Config) ->
                 }
                 }
             ]
             ]
         }},
         }},
-        get_metrics(source, SourceName)
+        get_metrics(source, SourceName, Config)
     ),
     ),
 
 
-    ok.
+    ok = lists:foreach(
+        fun emqx_cluster_link_cth:disconnect_client/1,
+        [SourceC1, SourceC2, TargetC1, TargetC2]
+    ).
 
 
 %% Checks that we can update a link via the API in the same fashion as the frontend does,
 %% Checks that we can update a link via the API in the same fashion as the frontend does,
 %% by sending secrets as `******', and the secret is not mangled.
 %% by sending secrets as `******', and the secret is not mangled.
-t_update_password(_Config) ->
+t_update_password(Config) ->
     ?check_trace(
     ?check_trace(
         begin
         begin
             Name = atom_to_binary(?FUNCTION_NAME),
             Name = atom_to_binary(?FUNCTION_NAME),
             Password = <<"my secret password">>,
             Password = <<"my secret password">>,
             Params1 = link_params(#{<<"password">> => Password}),
             Params1 = link_params(#{<<"password">> => Password}),
-            {201, Response1} = create_link(Name, Params1),
+            {201, Response1} = create_link(Name, Params1, Config),
             [#{name := Name, password := WrappedPassword0}] = emqx_config:get([cluster, links]),
             [#{name := Name, password := WrappedPassword0}] = emqx_config:get([cluster, links]),
             ?assertEqual(Password, emqx_secret:unwrap(WrappedPassword0)),
             ?assertEqual(Password, emqx_secret:unwrap(WrappedPassword0)),
             Params2A = remove_api_virtual_fields(Response1),
             Params2A = remove_api_virtual_fields(Response1),
             Params2 = Params2A#{<<"pool_size">> := 2},
             Params2 = Params2A#{<<"pool_size">> := 2},
             ?assertEqual(?REDACTED, maps:get(<<"password">>, Params2)),
             ?assertEqual(?REDACTED, maps:get(<<"password">>, Params2)),
-            ?assertMatch({200, _}, update_link(Name, Params2)),
+            ?assertMatch({200, _}, update_link(Name, Params2, Config)),
             [#{name := Name, password := WrappedPassword}] = emqx_config:get([cluster, links]),
             [#{name := Name, password := WrappedPassword}] = emqx_config:get([cluster, links]),
             ?assertEqual(Password, emqx_secret:unwrap(WrappedPassword)),
             ?assertEqual(Password, emqx_secret:unwrap(WrappedPassword)),
             ok
             ok
@@ -843,7 +847,7 @@ t_update_password(_Config) ->
     ok.
     ok.
 
 
 %% Checks that we forbid duplicate topic filters.
 %% Checks that we forbid duplicate topic filters.
-t_duplicate_topic_filters(_Config) ->
+t_duplicate_topic_filters(Config) ->
     ?check_trace(
     ?check_trace(
         begin
         begin
             Name = atom_to_binary(?FUNCTION_NAME),
             Name = atom_to_binary(?FUNCTION_NAME),
@@ -852,12 +856,12 @@ t_duplicate_topic_filters(_Config) ->
                 {400, #{
                 {400, #{
                     <<"message">> := #{
                     <<"message">> := #{
                         <<"reason">> := #{
                         <<"reason">> := #{
-                            <<"reason">> := <<"invalid_topics">>,
-                            <<"topics">> := #{<<"t">> := <<"duplicate_topic_filter">>}
+                            <<"reason">> := <<"redundant_topics">>,
+                            <<"topics">> := [<<"t">>]
                         }
                         }
                     }
                     }
                 }},
                 }},
-                create_link(Name, Params1)
+                create_link(Name, Params1, Config)
             ),
             ),
             ok
             ok
         end,
         end,
@@ -867,11 +871,11 @@ t_duplicate_topic_filters(_Config) ->
 
 
 %% Verifies that some fields are not required when updating a link, such as:
 %% Verifies that some fields are not required when updating a link, such as:
 %%  - clientid
 %%  - clientid
-t_optional_fields_update(_Config) ->
+t_optional_fields_update(Config) ->
     Name = <<"mylink">>,
     Name = <<"mylink">>,
     Params0 = maps:without([<<"clientid">>], link_params()),
     Params0 = maps:without([<<"clientid">>], link_params()),
-    {201, _} = create_link(Name, Params0),
-    ?assertMatch({200, _}, update_link(Name, Params0)),
+    {201, _} = create_link(Name, Params0, Config),
+    ?assertMatch({200, _}, update_link(Name, Params0, Config)),
     ok.
     ok.
 
 
 %% Verifies that, if we disable a link and then re-enable it, it should keep working.
 %% Verifies that, if we disable a link and then re-enable it, it should keep working.
@@ -880,15 +884,17 @@ t_disable_reenable(Config) ->
     [SN1, _SN2] = SourceNodes = ?config(source_nodes, Config),
     [SN1, _SN2] = SourceNodes = ?config(source_nodes, Config),
     [TN1, TN2] = ?config(target_nodes, Config),
     [TN1, TN2] = ?config(target_nodes, Config),
     SourceName = <<"cl.target">>,
     SourceName = <<"cl.target">>,
-    SourceC1 = emqx_cluster_link_SUITE:start_client(<<"sc1">>, SN1),
-    TargetC1 = emqx_cluster_link_SUITE:start_client(<<"tc1">>, TN1),
-    TargetC2 = emqx_cluster_link_SUITE:start_client(<<"tc2">>, TN2),
+
+    SourceC1 = emqx_cluster_link_cth:connect_client(<<"sc1">>, SN1),
+    TargetC1 = emqx_cluster_link_cth:connect_client(<<"tc1">>, TN1),
+    TargetC2 = emqx_cluster_link_cth:connect_client(<<"tc2">>, TN2),
     Topic1 = <<"t/tc1">>,
     Topic1 = <<"t/tc1">>,
     Topic2 = <<"t/tc2">>,
     Topic2 = <<"t/tc2">>,
     {ok, _, _} = emqtt:subscribe(TargetC1, Topic1),
     {ok, _, _} = emqtt:subscribe(TargetC1, Topic1),
     {ok, _, _} = emqtt:subscribe(TargetC2, Topic2),
     {ok, _, _} = emqtt:subscribe(TargetC2, Topic2),
     %% fixme: use snabbkaffe subscription
     %% fixme: use snabbkaffe subscription
-    ?block_until(#{?snk_kind := clink_route_sync_complete}),
+    ?block_until(#{?snk_kind := clink_route_sync_complete, ?snk_meta := #{node := TN1}}),
+    ?block_until(#{?snk_kind := clink_route_sync_complete, ?snk_meta := #{node := TN2}}),
     {ok, _} = emqtt:publish(SourceC1, Topic1, <<"1">>, [{qos, 1}]),
     {ok, _} = emqtt:publish(SourceC1, Topic1, <<"1">>, [{qos, 1}]),
     {ok, _} = emqtt:publish(SourceC1, Topic2, <<"2">>, [{qos, 1}]),
     {ok, _} = emqtt:publish(SourceC1, Topic2, <<"2">>, [{qos, 1}]),
     %% Sanity check: link is working, initially.
     %% Sanity check: link is working, initially.
@@ -896,13 +902,13 @@ t_disable_reenable(Config) ->
     ?assertReceive({publish, #{topic := Topic2, payload := <<"2">>}}),
     ?assertReceive({publish, #{topic := Topic2, payload := <<"2">>}}),
 
 
     %% Now we just disable and re-enable it in the link in the source cluster.
     %% Now we just disable and re-enable it in the link in the source cluster.
-    {200, #{<<"enable">> := true} = SourceLink0} = get_link(source, SourceName),
+    {200, #{<<"enable">> := true} = SourceLink0} = get_link(source, SourceName, Config),
     SourceLink1 = remove_api_virtual_fields(SourceLink0),
     SourceLink1 = remove_api_virtual_fields(SourceLink0),
     %% We force GC to simulate that we left the link disable for enough time that the GC
     %% We force GC to simulate that we left the link disable for enough time that the GC
     %% kicks in.
     %% kicks in.
     ?assertMatch(
     ?assertMatch(
         {200, #{<<"enable">> := false}},
         {200, #{<<"enable">> := false}},
-        update_link(source, SourceName, SourceLink1#{<<"enable">> := false})
+        update_link(source, SourceName, SourceLink1#{<<"enable">> := false}, Config)
     ),
     ),
     %% In the original issue, GC deleted the state of target cluster's agent in source
     %% In the original issue, GC deleted the state of target cluster's agent in source
     %% cluster.  After the fix, there's no longer GC, so we ignore timeouts here.
     %% cluster.  After the fix, there's no longer GC, so we ignore timeouts here.
@@ -913,7 +919,7 @@ t_disable_reenable(Config) ->
     ),
     ),
     ?assertMatch(
     ?assertMatch(
         {200, #{<<"enable">> := true}},
         {200, #{<<"enable">> := true}},
-        update_link(source, SourceName, SourceLink1)
+        update_link(source, SourceName, SourceLink1, Config)
     ),
     ),
 
 
     Topic3 = <<"t/tc3">>,
     Topic3 = <<"t/tc3">>,
@@ -932,4 +938,8 @@ t_disable_reenable(Config) ->
     ?assertReceive({publish, #{topic := Topic2, payload := <<"4">>}}),
     ?assertReceive({publish, #{topic := Topic2, payload := <<"4">>}}),
     ?assertReceive({publish, #{topic := Topic3, payload := <<"5">>}}),
     ?assertReceive({publish, #{topic := Topic3, payload := <<"5">>}}),
     ?assertReceive({publish, #{topic := Topic4, payload := <<"6">>}}),
     ?assertReceive({publish, #{topic := Topic4, payload := <<"6">>}}),
-    ok.
+
+    ok = lists:foreach(
+        fun emqx_cluster_link_cth:disconnect_client/1,
+        [SourceC1, TargetC1, TargetC2]
+    ).

+ 31 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_cth.erl

@@ -0,0 +1,31 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_cth).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+%%
+
+-spec connect_client_unlink(_ClientID :: binary(), node()) -> _Pid :: emqtt:client().
+connect_client_unlink(ClientId, Node) ->
+    Client = connect_client(ClientId, Node),
+    _ = erlang:unlink(Client),
+    Client.
+
+-spec connect_client(_ClientID :: binary(), node()) -> _Pid :: emqtt:client().
+connect_client(ClientId, Node) ->
+    Port = tcp_port(Node),
+    {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
+    {ok, _} = emqtt:connect(Client),
+    Client.
+
+-spec disconnect_client(emqtt:client()) -> ok.
+disconnect_client(Pid) ->
+    emqtt:disconnect(Pid).
+
+tcp_port(Node) ->
+    {_Host, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
+    Port.

+ 3 - 0
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -309,6 +309,9 @@ maybe_json_decode(X) ->
 
 
 simple_request(Method, Path, Params) ->
 simple_request(Method, Path, Params) ->
     AuthHeader = auth_header_(),
     AuthHeader = auth_header_(),
+    simple_request(Method, Path, Params, AuthHeader).
+
+simple_request(Method, Path, Params, AuthHeader) ->
     Opts = #{return_all => true},
     Opts = #{return_all => true},
     case request_api(Method, Path, "", AuthHeader, Params, Opts) of
     case request_api(Method, Path, "", AuthHeader, Params, Opts) of
         {ok, {{_, Status, _}, _Headers, Body0}} ->
         {ok, {{_, Status, _}, _Headers, Body0}} ->