Просмотр исходного кода

test(clusterlink): add more test cases

Serge Tupchii 1 год назад
Родитель
Сommit
a95a08efd3

+ 1 - 1
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -205,7 +205,7 @@ actor_init(
                     {error, <<"bad_remote_cluster_link_name">>}
             end;
         #{enable := false} ->
-            {error, <<"clster_link_disabled">>}
+            {error, <<"cluster_link_disabled">>}
     end.
 
 actor_init_ack(#{actor := Actor}, Res, MsgIn) ->

+ 56 - 21
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -16,6 +16,13 @@
     push_persistent_route/4
 ]).
 
+%% debug/test helpers
+-export([
+    status/1,
+    where/1,
+    where/2
+]).
+
 -export([
     start_link_actor/4,
     start_link_syncer/4
@@ -46,8 +53,8 @@
 -define(CLIENT_NAME(Cluster), ?NAME(Cluster, client)).
 -define(SYNCER_NAME(Cluster), ?NAME(Cluster, syncer)).
 -define(SYNCER_REF(Cluster), {via, gproc, ?SYNCER_NAME(Cluster)}).
--define(ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, actor)}).
 -define(ACTOR_NAME(Cluster), ?NAME(Cluster, actor)).
+-define(ACTOR_REF(Cluster), {via, gproc, ?ACTOR_NAME(Cluster)}).
 
 -define(MAX_BATCH_SIZE, 4000).
 -define(MIN_SYNC_INTERVAL, 10).
@@ -85,6 +92,22 @@
     end
 ).
 
+-record(st, {
+    target :: binary(),
+    actor :: binary(),
+    incarnation :: non_neg_integer(),
+    client :: undefined | pid(),
+    bootstrapped :: boolean(),
+    reconnect_timer :: undefined | reference(),
+    heartbeat_timer :: undefined | reference(),
+    actor_init_req_id :: undefined | binary(),
+    actor_init_timer :: undefined | reference(),
+    remote_actor_info :: undefined | map(),
+    status :: connecting | connected | disconnected,
+    error :: undefined | term(),
+    link_conf :: map()
+}).
+
 push(TargetCluster, OpName, Topic, ID) ->
     do_push(?SYNCER_NAME(TargetCluster), OpName, Topic, ID).
 
@@ -99,6 +122,24 @@ do_push(SyncerName, OpName, Topic, ID) ->
             dropped
     end.
 
+%% Debug/test helpers
+where(Cluster) ->
+    where(actor, Cluster).
+
+where(actor, Cluster) ->
+    gproc:where(?ACTOR_NAME(Cluster));
+where(ps_actor, Cluster) ->
+    gproc:where(?PS_ACTOR_NAME(Cluster)).
+
+status(Cluster) ->
+    case where(actor, Cluster) of
+        Pid when is_pid(Pid) ->
+            #st{error = Err, status = Status} = sys:get_state(Pid),
+            #{error => Err, status => Status};
+        undefined ->
+            undefined
+    end.
+
 %% Supervisor:
 %% 1. Actor + MQTT Client
 %% 2. Syncer
@@ -290,24 +331,6 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) ->
         type => worker
     }.
 
-%%
-
--record(st, {
-    target :: binary(),
-    actor :: binary(),
-    incarnation :: non_neg_integer(),
-    client :: undefined | pid(),
-    bootstrapped :: boolean(),
-    reconnect_timer :: undefined | reference(),
-    heartbeat_timer :: undefined | reference(),
-    actor_init_req_id :: undefined | binary(),
-    actor_init_timer :: undefined | reference(),
-    remote_actor_info :: undefined | map(),
-    status :: connecting | connected | disconnected,
-    error :: undefined | term(),
-    link_conf :: map()
-}).
-
 mk_state(#{upstream := TargetCluster} = LinkConf, Actor, Incarnation) ->
     #st{
         target = TargetCluster,
@@ -361,6 +384,12 @@ handle_info(
                 remote_link_proto_ver => maps:get(proto_ver, AckInfoMap, undefined)
             }),
             _ = maybe_alarm(Reason, St1),
+            ?tp(
+                debug,
+                clink_handshake_error,
+                #{actor => {St1#st.actor, St1#st.incarnation}, reason => Reason}
+            ),
+            %% TODO: retry after a timeout?
             {noreply, St1#st{error = Reason, status = disconnected}}
     end;
 handle_info({publish, #{}}, St) ->
@@ -376,7 +405,7 @@ handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) ->
     Reason = init_timeout,
     _ = maybe_alarm(Reason, St),
     {noreply,
-        init_remote_actor(St#st{reconnect_timer = undefined, status = disconnected, error = Reason})};
+        init_remote_actor(St#st{actor_init_timer = undefined, status = disconnected, error = Reason})};
 handle_info({timeout, TRef, _Heartbeat}, St = #st{heartbeat_timer = TRef}) ->
     {noreply, process_heartbeat(St#st{heartbeat_timer = undefined})};
 %% Stale timeout.
@@ -386,7 +415,8 @@ handle_info(Info, St) ->
     ?SLOG(warning, #{msg => "unexpected_info", info => Info}),
     {noreply, St}.
 
-terminate(_Reason, _State) ->
+terminate(_Reason, State) ->
+    _ = maybe_deactivate_alarm(State),
     ok.
 
 process_connect(St = #st{target = TargetCluster, actor = Actor, link_conf = Conf}) ->
@@ -507,6 +537,11 @@ run_bootstrap(St = #st{target = TargetCluster, link_conf = #{topics := Topics}})
 run_bootstrap(Bootstrap, St) ->
     case emqx_cluster_link_router_bootstrap:next_batch(Bootstrap) of
         done ->
+            ?tp(
+                debug,
+                clink_route_bootstrap_complete,
+                #{actor => {St#st.actor, St#st.incarnation}, cluster => St#st.target}
+            ),
             process_bootstrapped(St);
         {Batch, NBootstrap} ->
             %% TODO: Better error handling.

+ 33 - 5
apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl

@@ -15,7 +15,17 @@
 %%
 
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    [
+        {group, shared_subs},
+        {group, non_shared_subs}
+    ].
+
+groups() ->
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {shared_subs, AllTCs},
+        {non_shared_subs, AllTCs}
+    ].
 
 init_per_suite(Config) ->
     Config.
@@ -23,6 +33,14 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     ok.
 
+init_per_group(shared_subs, Config) ->
+    [{is_shared_sub, true} | Config];
+init_per_group(non_shared_subs, Config) ->
+    [{is_shared_sub, false} | Config].
+
+end_per_group(_Group, _Config) ->
+    ok.
+
 init_per_testcase(TCName, Config) ->
     emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config).
 
@@ -136,11 +154,14 @@ t_message_forwarding('end', Config) ->
 t_message_forwarding(Config) ->
     [SourceNode1 | _] = nodes_source(Config),
     [TargetNode1, TargetNode2 | _] = nodes_target(Config),
+
     SourceC1 = start_client("t_message_forwarding", SourceNode1),
     TargetC1 = start_client("t_message_forwarding1", TargetNode1),
     TargetC2 = start_client("t_message_forwarding2", TargetNode2),
-    {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/+">>, qos1),
-    {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/#">>, qos1),
+    IsShared = ?config(is_shared_sub, Config),
+
+    {ok, _, _} = emqtt:subscribe(TargetC1, maybe_shared_topic(IsShared, <<"t/+">>), qos1),
+    {ok, _, _} = emqtt:subscribe(TargetC2, maybe_shared_topic(IsShared, <<"t/#">>), qos1),
     {ok, _} = ?block_until(#{?snk_kind := clink_route_sync_complete}),
     {ok, _} = emqtt:publish(SourceC1, <<"t/42">>, <<"hello">>, qos1),
     ?assertReceive(
@@ -178,8 +199,10 @@ t_target_extrouting_gc(Config) ->
     SourceC1 = start_client("t_target_extrouting_gc", SourceNode1),
     TargetC1 = start_client_unlink("t_target_extrouting_gc1", TargetNode1),
     TargetC2 = start_client_unlink("t_target_extrouting_gc2", TargetNode2),
-    {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/#">>, qos1),
-    {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/+">>, qos1),
+    IsShared = ?config(is_shared_sub, Config),
+
+    {ok, _, _} = emqtt:subscribe(TargetC1, maybe_shared_topic(IsShared, <<"t/#">>), qos1),
+    {ok, _, _} = emqtt:subscribe(TargetC2, maybe_shared_topic(IsShared, <<"t/+">>), qos1),
     {ok, _} = ?block_until(#{?snk_kind := clink_route_sync_complete}),
     {ok, _} = emqtt:publish(SourceC1, <<"t/1">>, <<"HELLO1">>, qos1),
     {ok, _} = emqtt:publish(SourceC1, <<"t/2/ext">>, <<"HELLO2">>, qos1),
@@ -232,6 +255,11 @@ t_target_extrouting_gc(Config) ->
 
 %%
 
+maybe_shared_topic(true = _IsShared, Topic) ->
+    <<"$share/test-group/", Topic/binary>>;
+maybe_shared_topic(false = _IsShared, Topic) ->
+    Topic.
+
 start_client_unlink(ClientId, Node) ->
     Client = start_client(ClientId, Node),
     _ = erlang:unlink(Client),

+ 132 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl

@@ -0,0 +1,132 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_api_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(API_PATH, emqx_mgmt_api_test_util:api_path(["cluster", "links"])).
+-define(CONF_PATH, [cluster, links]).
+
+-define(CACERT, <<
+    "-----BEGIN CERTIFICATE-----\n"
+    "MIIDUTCCAjmgAwIBAgIJAPPYCjTmxdt/MA0GCSqGSIb3DQEBCwUAMD8xCzAJBgNV\n"
+    "BAYTAkNOMREwDwYDVQQIDAhoYW5nemhvdTEMMAoGA1UECgwDRU1RMQ8wDQYDVQQD\n"
+    "DAZSb290Q0EwHhcNMjAwNTA4MDgwNjUyWhcNMzAwNTA2MDgwNjUyWjA/MQswCQYD\n"
+    "VQQGEwJDTjERMA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UE\n"
+    "AwwGUm9vdENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzcgVLex1\n"
+    "EZ9ON64EX8v+wcSjzOZpiEOsAOuSXOEN3wb8FKUxCdsGrsJYB7a5VM/Jot25Mod2\n"
+    "juS3OBMg6r85k2TWjdxUoUs+HiUB/pP/ARaaW6VntpAEokpij/przWMPgJnBF3Ur\n"
+    "MjtbLayH9hGmpQrI5c2vmHQ2reRZnSFbY+2b8SXZ+3lZZgz9+BaQYWdQWfaUWEHZ\n"
+    "uDaNiViVO0OT8DRjCuiDp3yYDj3iLWbTA/gDL6Tf5XuHuEwcOQUrd+h0hyIphO8D\n"
+    "tsrsHZ14j4AWYLk1CPA6pq1HIUvEl2rANx2lVUNv+nt64K/Mr3RnVQd9s8bK+TXQ\n"
+    "KGHd2Lv/PALYuwIDAQABo1AwTjAdBgNVHQ4EFgQUGBmW+iDzxctWAWxmhgdlE8Pj\n"
+    "EbQwHwYDVR0jBBgwFoAUGBmW+iDzxctWAWxmhgdlE8PjEbQwDAYDVR0TBAUwAwEB\n"
+    "/zANBgkqhkiG9w0BAQsFAAOCAQEAGbhRUjpIred4cFAFJ7bbYD9hKu/yzWPWkMRa\n"
+    "ErlCKHmuYsYk+5d16JQhJaFy6MGXfLgo3KV2itl0d+OWNH0U9ULXcglTxy6+njo5\n"
+    "CFqdUBPwN1jxhzo9yteDMKF4+AHIxbvCAJa17qcwUKR5MKNvv09C6pvQDJLzid7y\n"
+    "E2dkgSuggik3oa0427KvctFf8uhOV94RvEDyqvT5+pgNYZ2Yfga9pD/jjpoHEUlo\n"
+    "88IGU8/wJCx3Ds2yc8+oBg/ynxG8f/HmCC1ET6EHHoe2jlo8FpU/SgGtghS1YL30\n"
+    "IWxNsPrUP+XsZpBJy/mvOhE5QXo6Y35zDqqj8tI7AGmAWu22jg==\n"
+    "-----END CERTIFICATE-----"
+>>).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    %% This is called by emqx_machine in EMQX release
+    emqx_otel_app:configure_otel_deps(),
+    Apps = emqx_cth_suite:start(
+        [
+            emqx_conf,
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"},
+            emqx_cluster_link
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    Auth = auth_header(),
+    [{suite_apps, Apps}, {auth, Auth} | Config].
+
+end_per_suite(Config) ->
+    emqx_cth_suite:stop(?config(suite_apps, Config)),
+    emqx_config:delete_override_conf_files(),
+    ok.
+
+auth_header() ->
+    {ok, API} = emqx_common_test_http:create_default_app(),
+    emqx_common_test_http:auth_header(API).
+
+init_per_testcase(_TC, Config) ->
+    {ok, _} = emqx_cluster_link_config:update([]),
+    Config.
+
+end_per_testcase(_TC, _Config) ->
+    ok.
+
+t_put_get_valid(Config) ->
+    Auth = ?config(auth, Config),
+    Path = ?API_PATH,
+    {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
+    ?assertMatch([], emqx_utils_json:decode(Resp)),
+
+    Link1 = #{
+        <<"pool_size">> => 1,
+        <<"server">> => <<"emqxcl_2.nohost:31883">>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
+        <<"name">> => <<"emqcl_1">>
+    },
+    Link2 = #{
+        <<"pool_size">> => 1,
+        <<"server">> => <<"emqxcl_2.nohost:41883">>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
+        <<"name">> => <<"emqcl_2">>
+    },
+    ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link1, Link2])),
+
+    {ok, Resp1} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
+    ?assertMatch([Link1, Link2], emqx_utils_json:decode(Resp1)),
+
+    DisabledLink1 = Link1#{<<"enable">> => false},
+    ?assertMatch(
+        {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [DisabledLink1, Link2])
+    ),
+
+    {ok, Resp2} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
+    ?assertMatch([DisabledLink1, Link2], emqx_utils_json:decode(Resp2)),
+
+    SSL = #{<<"enable">> => true, <<"cacertfile">> => ?CACERT},
+    SSLLink1 = Link1#{<<"ssl">> => SSL},
+    ?assertMatch(
+        {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link2, SSLLink1])
+    ),
+    {ok, Resp3} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
+
+    ?assertMatch(
+        [Link2, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}],
+        emqx_utils_json:decode(Resp3)
+    ).
+
+t_put_invalid(Config) ->
+    Auth = ?config(auth, Config),
+    Path = ?API_PATH,
+    Link = #{
+        <<"pool_size">> => 1,
+        <<"server">> => <<"emqxcl_2.nohost:31883">>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
+        <<"name">> => <<"emqcl_1">>
+    },
+    ?assertMatch(
+        {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link, Link])
+    ),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [maps:remove(<<"name">>, Link)])
+    ).

+ 647 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl

@@ -0,0 +1,647 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_cluster_link_config_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/asserts.hrl").
+-include_lib("emqx_utils/include/emqx_message.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
+
+init_per_testcase(TCName, Config) ->
+    emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config).
+
+end_per_testcase(TCName, Config) ->
+    emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).
+
+mk_clusters(NameA, NameB, PortA, PortB, ConfA, ConfB, Config) ->
+    AppsA = [{emqx_conf, ConfA}, emqx_cluster_link],
+    AppsA1 = [
+        {emqx_conf, combine([ConfA, conf_mqtt_listener(PortA)])},
+        emqx_cluster_link
+    ],
+    AppsB = [{emqx_conf, ConfB}, emqx_cluster_link],
+    AppsB1 = [
+        {emqx_conf, combine([ConfB, conf_mqtt_listener(PortB)])},
+        emqx_cluster_link
+    ],
+
+    NodesA = emqx_cth_cluster:mk_nodespecs(
+        [
+            {mk_nodename(NameA, 1), #{apps => AppsA}},
+            {mk_nodename(NameA, 2), #{apps => AppsA}},
+            {mk_nodename(NameA, 3), #{apps => AppsA1, role => replicant}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    NodesB = emqx_cth_cluster:mk_nodespecs(
+        [
+            {mk_nodename(NameB, 1), #{apps => AppsB, base_port => 20100}},
+            {mk_nodename(NameB, 2), #{apps => AppsB1, base_port => 20200}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    {NodesA, NodesB}.
+
+t_config_update('init', Config) ->
+    NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
+    NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
+    LPortA = 31883,
+    LPortB = 41883,
+    ConfA = combine([conf_cluster(NameA), conf_log()]),
+    ConfB = combine([conf_cluster(NameB), conf_log()]),
+    {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
+    ClusterA = emqx_cth_cluster:start(NodesA),
+    ClusterB = emqx_cth_cluster:start(NodesB),
+    ok = snabbkaffe:start_trace(),
+    [
+        {cluster_a, ClusterA},
+        {cluster_b, ClusterB},
+        {lport_a, LPortA},
+        {lport_b, LPortB},
+        {name_a, NameA},
+        {name_b, NameB}
+        | Config
+    ];
+t_config_update('end', Config) ->
+    ok = snabbkaffe:stop(),
+    ok = emqx_cth_cluster:stop(?config(cluster_a, Config)),
+    ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
+
+t_config_update(Config) ->
+    [NodeA1, _, _] = ?config(cluster_a, Config),
+    [NodeB1, _] = ?config(cluster_b, Config),
+    LPortA = ?config(lport_a, Config),
+    LPortB = ?config(lport_b, Config),
+    NameA = ?config(name_a, Config),
+    NameB = ?config(name_b, Config),
+
+    ClientA = start_client("t_config_a", NodeA1),
+    ClientB = start_client("t_config_b", NodeB1),
+
+    {ok, _, _} = emqtt:subscribe(ClientA, <<"t/test/1/+">>, qos1),
+    {ok, _, _} = emqtt:subscribe(ClientB, <<"t/test-topic">>, qos1),
+
+    %% add link
+    LinkConfA = #{
+        <<"enable">> => true,
+        <<"pool_size">> => 1,
+        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
+        <<"upstream">> => NameB
+    },
+    LinkConfB = #{
+        <<"enable">> => true,
+        <<"pool_size">> => 1,
+        <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
+        <<"upstream">> => NameA
+    },
+
+    {ok, SubRef} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
+        %% 5 nodes = 5 actors (durable storage is dsabled)
+        5,
+        30_000
+    ),
+    ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])),
+    ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])),
+
+    ?assertMatch(
+        {ok, [
+            #{?snk_kind := clink_route_bootstrap_complete},
+            #{?snk_kind := clink_route_bootstrap_complete},
+            #{?snk_kind := clink_route_bootstrap_complete},
+            #{?snk_kind := clink_route_bootstrap_complete},
+            #{?snk_kind := clink_route_bootstrap_complete}
+        ]},
+        snabbkaffe:receive_events(SubRef)
+    ),
+
+    {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"hello-from-a">>, qos1),
+    {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"hello-from-b">>, qos1),
+
+    ?assertReceive(
+        {publish, #{
+            topic := <<"t/test-topic">>, payload := <<"hello-from-a">>, client_pid := ClientB
+        }},
+        7000
+    ),
+    ?assertReceive(
+        {publish, #{
+            topic := <<"t/test/1/1">>, payload := <<"hello-from-b">>, client_pid := ClientA
+        }},
+        7000
+    ),
+    %% no more messages expected
+    ?assertNotReceive({publish, _Message = #{}}),
+
+    {ok, SubRef1} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
+        %% 3 nodes in cluster a
+        3,
+        30_000
+    ),
+
+    %% update link
+    LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]},
+    ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])),
+
+    ?assertMatch(
+        {ok, [
+            #{?snk_kind := clink_route_bootstrap_complete},
+            #{?snk_kind := clink_route_bootstrap_complete},
+            #{?snk_kind := clink_route_bootstrap_complete}
+        ]},
+        snabbkaffe:receive_events(SubRef1)
+    ),
+
+    %% wait for route sync on ClientA node
+    {{ok, _, _}, {ok, _}} = ?wait_async_action(
+        emqtt:subscribe(ClientA, <<"t/new/1">>, qos1),
+        #{?snk_kind := clink_route_sync_complete, ?snk_meta := #{node := NodeA1}},
+        10_000
+    ),
+
+    %% not expected to be received anymore
+    {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"not-expected-hello-from-b">>, qos1),
+    {ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"hello-from-b-1">>, qos1),
+    ?assertReceive(
+        {publish, #{topic := <<"t/new/1">>, payload := <<"hello-from-b-1">>, client_pid := ClientA}},
+        7000
+    ),
+    ?assertNotReceive({publish, _Message = #{}}),
+
+    %% disable link
+    LinkConfA2 = LinkConfA1#{<<"enable">> => false},
+    ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA2]])),
+    %% must be already blocked by the receiving cluster even if externak routing state is not
+    %% updated yet
+    {ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"not-expected-hello-from-b-1">>, qos1),
+
+    LinkConfB1 = LinkConfB#{<<"enable">> => false},
+    ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB1]])),
+    {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"not-expected-hello-from-a">>, qos1),
+
+    ?assertNotReceive({publish, _Message = #{}}, 3000),
+
+    %% delete links
+    ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])),
+    ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[]])),
+
+    ok = emqtt:stop(ClientA),
+    ok = emqtt:stop(ClientB).
+
+t_config_validations('init', Config) ->
+    NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
+    NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
+    LPortA = 31883,
+    LPortB = 41883,
+    ConfA = combine([conf_cluster(NameA), conf_log()]),
+    ConfB = combine([conf_cluster(NameB), conf_log()]),
+    %% Single node clusters are enough for a basic validation test
+    {[NodeA, _, _], [NodeB, _]} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
+    ClusterA = emqx_cth_cluster:start([NodeA]),
+    ClusterB = emqx_cth_cluster:start([NodeB]),
+    ok = snabbkaffe:start_trace(),
+    [
+        {cluster_a, ClusterA},
+        {cluster_b, ClusterB},
+        {lport_a, LPortA},
+        {lport_b, LPortB},
+        {name_a, NameA},
+        {name_b, NameB}
+        | Config
+    ];
+t_config_validations('end', Config) ->
+    ok = snabbkaffe:stop(),
+    ok = emqx_cth_cluster:stop(?config(cluster_a, Config)),
+    ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
+
+t_config_validations(Config) ->
+    [NodeA] = ?config(cluster_a, Config),
+    LPortB = ?config(lport_b, Config),
+
+    NameB = ?config(name_b, Config),
+
+    LinkConfA = #{
+        <<"enable">> => true,
+        <<"pool_size">> => 1,
+        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
+        <<"upstream">> => NameB
+    },
+    DuplicatedLinks = [LinkConfA, LinkConfA#{<<"enable">> => false, <<"pool_size">> => 2}],
+    ?assertMatch(
+        {error, #{reason := #{reason := duplicated_cluster_links, names := _}}},
+        erpc:call(NodeA, emqx_cluster_link_config, update, [DuplicatedLinks])
+    ),
+
+    InvalidTopics = [<<"t/test/#">>, <<"$LINK/cluster/test/#">>],
+    InvalidTopics1 = [<<"t/+/#/+">>, <<>>],
+    ?assertMatch(
+        {error, #{reason := #{reason := invalid_topics, topics := _}}},
+        erpc:call(NodeA, emqx_cluster_link_config, update, [
+            [LinkConfA#{<<"topics">> => InvalidTopics}]
+        ])
+    ),
+    ?assertMatch(
+        {error, #{reason := #{reason := invalid_topics, topics := _}}},
+        erpc:call(NodeA, emqx_cluster_link_config, update, [
+            [LinkConfA#{<<"topics">> => InvalidTopics1}]
+        ])
+    ),
+    ?assertMatch(
+        {error, #{reason := required_field}},
+        erpc:call(NodeA, emqx_cluster_link_config, update, [
+            [maps:remove(<<"upstream">>, LinkConfA)]
+        ])
+    ),
+    ?assertMatch(
+        {error, #{reason := required_field}},
+        erpc:call(NodeA, emqx_cluster_link_config, update, [[maps:remove(<<"server">>, LinkConfA)]])
+    ),
+    ?assertMatch(
+        {error, #{reason := required_field}},
+        erpc:call(NodeA, emqx_cluster_link_config, update, [[maps:remove(<<"topics">>, LinkConfA)]])
+    ),
+
+    %% Some valid changes to cover different update scenarios (msg resource changed, actor changed, both changed)
+    ?assertMatch(
+        {ok, _},
+        erpc:call(NodeA, emqx_cluster_link_config, update, [[LinkConfA]])
+    ),
+    LinkConfUnknown = LinkConfA#{
+        <<"upstream">> => <<"no-cluster">>, <<"server">> => <<"no-cluster.emqx:31883">>
+    },
+    ?assertMatch(
+        {ok, _},
+        erpc:call(NodeA, emqx_cluster_link_config, update, [
+            [LinkConfA#{<<"pool_size">> => 5}, LinkConfUnknown]
+        ])
+    ),
+
+    ?assertMatch(
+        {ok, _},
+        erpc:call(NodeA, emqx_cluster_link_config, update, [
+            [LinkConfA, LinkConfUnknown#{<<"topics">> => []}]
+        ])
+    ),
+
+    ?assertMatch(
+        {ok, _},
+        erpc:call(
+            NodeA,
+            emqx_cluster_link_config,
+            update,
+            [
+                [
+                    LinkConfA#{
+                        <<"clientid">> => <<"new-client">>,
+                        <<"username">> => <<"user">>
+                    },
+                    LinkConfUnknown#{
+                        <<"clientid">> => <<"new-client">>,
+                        <<"username">> => <<"user">>
+                    }
+                ]
+            ]
+        )
+    ).
+
+t_config_update_ds('init', Config) ->
+    NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
+    NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
+    LPortA = 31883,
+    LPortB = 41883,
+    ConfA = combine([conf_cluster(NameA), conf_log(), conf_ds()]),
+    ConfB = combine([conf_cluster(NameB), conf_log(), conf_ds()]),
+    {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
+    ClusterA = emqx_cth_cluster:start(NodesA),
+    ClusterB = emqx_cth_cluster:start(NodesB),
+    ok = snabbkaffe:start_trace(),
+    [
+        {cluster_a, ClusterA},
+        {cluster_b, ClusterB},
+        {lport_a, LPortA},
+        {lport_b, LPortB},
+        {name_a, NameA},
+        {name_b, NameB}
+        | Config
+    ];
+t_config_update_ds('end', Config) ->
+    ok = snabbkaffe:stop(),
+    ok = emqx_cth_cluster:stop(?config(cluster_a, Config)),
+    ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
+
+t_config_update_ds(Config) ->
+    [NodeA1, _, _] = ?config(cluster_a, Config),
+    [NodeB1, _] = ?config(cluster_b, Config),
+    LPortA = ?config(lport_a, Config),
+    LPortB = ?config(lport_b, Config),
+    NameA = ?config(name_a, Config),
+    NameB = ?config(name_b, Config),
+
+    ClientA = start_client("t_config_a", NodeA1, false),
+    ClientB = start_client("t_config_b", NodeB1, false),
+    {ok, _, _} = emqtt:subscribe(ClientA, <<"t/test/1/+">>, qos1),
+    {ok, _, _} = emqtt:subscribe(ClientB, <<"t/test-topic">>, qos1),
+
+    LinkConfA = #{
+        <<"enable">> => true,
+        <<"pool_size">> => 1,
+        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
+        <<"upstream">> => NameB
+    },
+    LinkConfB = #{
+        <<"enable">> => true,
+        <<"pool_size">> => 1,
+        <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
+        <<"upstream">> => NameA
+    },
+
+    {ok, SubRef} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
+        %% 5 nodes = 9 actors (durable storage is enabled,
+        %% 1 replicant node is not doing ds bootstrap)
+        9,
+        30_000
+    ),
+    ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])),
+    ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])),
+
+    ?assertMatch(
+        [#{ps_actor_incarnation := 0}], erpc:call(NodeA1, emqx, get_config, [[cluster, links]])
+    ),
+    ?assertMatch(
+        [#{ps_actor_incarnation := 0}], erpc:call(NodeB1, emqx, get_config, [[cluster, links]])
+    ),
+
+    {ok, Events} = snabbkaffe:receive_events(SubRef),
+    ?assertEqual(9, length(Events)),
+
+    {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"hello-from-a">>, qos1),
+    {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"hello-from-b">>, qos1),
+
+    ?assertReceive(
+        {publish, #{
+            topic := <<"t/test-topic">>, payload := <<"hello-from-a">>, client_pid := ClientB
+        }},
+        30_000
+    ),
+    ?assertReceive(
+        {publish, #{
+            topic := <<"t/test/1/1">>, payload := <<"hello-from-b">>, client_pid := ClientA
+        }},
+        30_000
+    ),
+    %% no more messages expected
+    ?assertNotReceive({publish, _Message = #{}}),
+    {ok, SubRef1} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
+        %% 3 nodes (1 replicant) in cluster a (5 actors including ds)
+        5,
+        30_000
+    ),
+
+    %% update link
+
+    LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]},
+    ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])),
+
+    {ok, Events1} = snabbkaffe:receive_events(SubRef1),
+    ?assertEqual(5, length(Events1)),
+
+    %% wait for route sync on ClientA node
+    {{ok, _, _}, {ok, _}} = ?wait_async_action(
+        emqtt:subscribe(ClientA, <<"t/new/1">>, qos1),
+        #{
+            ?snk_kind := clink_route_sync_complete,
+            ?snk_meta := #{node := NodeA1},
+            actor := {<<"ps-routes-v1">>, 1}
+        },
+        10_000
+    ),
+    %% not expected to be received anymore
+    {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"not-expected-hello-from-b">>, qos1),
+    {ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"hello-from-b-1">>, qos1),
+    ?assertReceive(
+        {publish, #{topic := <<"t/new/1">>, payload := <<"hello-from-b-1">>, client_pid := ClientA}},
+        30_000
+    ),
+    ?assertNotReceive({publish, _Message = #{}}),
+
+    ?assertMatch(
+        [#{ps_actor_incarnation := 1}], erpc:call(NodeA1, emqx, get_config, [[cluster, links]])
+    ),
+    ?assertMatch(
+        [#{ps_actor_incarnation := 1}], erpc:call(NodeA1, emqx, get_config, [[cluster, links]])
+    ),
+
+    ok = emqtt:stop(ClientA),
+    ok = emqtt:stop(ClientB).
+
+t_misconfigured_links('init', Config) ->
+    NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
+    NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
+    LPortA = 31883,
+    LPortB = 41883,
+    ConfA = combine([conf_cluster(NameA), conf_log()]),
+    ConfB = combine([conf_cluster(NameB), conf_log()]),
+    {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
+    ClusterA = emqx_cth_cluster:start(NodesA),
+    ClusterB = emqx_cth_cluster:start(NodesB),
+    ok = snabbkaffe:start_trace(),
+    [
+        {cluster_a, ClusterA},
+        {cluster_b, ClusterB},
+        {lport_a, LPortA},
+        {lport_b, LPortB},
+        {name_a, NameA},
+        {name_b, NameB}
+        | Config
+    ];
+t_misconfigured_links('end', Config) ->
+    ok = snabbkaffe:stop(),
+    ok = emqx_cth_cluster:stop(?config(cluster_a, Config)),
+    ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
+
+t_misconfigured_links(Config) ->
+    [NodeA1, _, _] = ?config(cluster_a, Config),
+    [NodeB1, _] = ?config(cluster_b, Config),
+    LPortA = ?config(lport_a, Config),
+    LPortB = ?config(lport_b, Config),
+    NameA = ?config(name_a, Config),
+    NameB = ?config(name_b, Config),
+
+    ClientA = start_client("t_config_a", NodeA1),
+    ClientB = start_client("t_config_b", NodeB1),
+
+    {ok, _, _} = emqtt:subscribe(ClientA, <<"t/test/1/+">>, qos1),
+    {ok, _, _} = emqtt:subscribe(ClientB, <<"t/test-topic">>, qos1),
+
+    LinkConfA = #{
+        <<"enable">> => true,
+        <<"pool_size">> => 1,
+        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
+        <<"upstream">> => <<"bad-b-name">>
+    },
+    LinkConfB = #{
+        <<"enable">> => true,
+        <<"pool_size">> => 1,
+        <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
+        <<"upstream">> => NameA
+    },
+
+    ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])),
+
+    {{ok, _}, {ok, _}} = ?wait_async_action(
+        erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]]),
+        #{
+            ?snk_kind := clink_handshake_error,
+            reason := <<"bad_remote_cluster_link_name">>,
+            ?snk_meta := #{node := NodeA1}
+        },
+        10_000
+    ),
+    timer:sleep(10),
+    ?assertMatch(
+        #{error := <<"bad_remote_cluster_link_name">>},
+        erpc:call(NodeA1, emqx_cluster_link_router_syncer, status, [<<"bad-b-name">>])
+    ),
+
+    {{ok, _}, {ok, _}} = ?wait_async_action(
+        erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]),
+        #{
+            ?snk_kind := clink_route_bootstrap_complete,
+            ?snk_meta := #{node := NodeA1}
+        },
+        10_000
+    ),
+    ?assertMatch(
+        #{status := connected, error := undefined},
+        erpc:call(NodeA1, emqx_cluster_link_router_syncer, status, [NameB])
+    ),
+    ?assertEqual(
+        undefined, erpc:call(NodeA1, emqx_cluster_link_router_syncer, status, [<<"bad-b-name">>])
+    ),
+
+    ?assertMatch(
+        {ok, _},
+        erpc:call(
+            NodeB1,
+            emqx_cluster_link_config,
+            update,
+            [
+                [
+                    LinkConfB#{<<"enable">> => false},
+                    %% An extra dummy link to keep B hook/external_broker registered and be able to
+                    %% respond with "link disabled error" for the first disabled link
+                    LinkConfB#{<<"upstream">> => <<"bad-a-name">>}
+                ]
+            ]
+        )
+    ),
+
+    ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])),
+    {{ok, _}, {ok, _}} = ?wait_async_action(
+        erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]),
+        #{
+            ?snk_kind := clink_handshake_error,
+            reason := <<"cluster_link_disabled">>,
+            ?snk_meta := #{node := NodeA1}
+        },
+        10_000
+    ),
+    timer:sleep(10),
+    ?assertMatch(
+        #{error := <<"cluster_link_disabled">>},
+        erpc:call(NodeA1, emqx_cluster_link_router_syncer, status, [NameB])
+    ),
+
+    ?assertMatch(
+        {ok, _},
+        erpc:call(NodeB1, emqx_cluster_link_config, update, [
+            [LinkConfB#{<<"upstream">> => <<"bad-a-name">>}]
+        ])
+    ),
+    ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])),
+
+    {{ok, _}, {ok, _}} = ?wait_async_action(
+        erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]),
+        #{
+            ?snk_kind := clink_handshake_error,
+            reason := <<"unknown_cluster">>,
+            ?snk_meta := #{node := NodeA1}
+        },
+        10_000
+    ),
+    timer:sleep(10),
+    ?assertMatch(
+        #{error := <<"unknown_cluster">>},
+        erpc:call(NodeA1, emqx_cluster_link_router_syncer, status, [NameB])
+    ),
+
+    ok = emqtt:stop(ClientA),
+    ok = emqtt:stop(ClientB).
+
+start_client(ClientId, Node) ->
+    start_client(ClientId, Node, true).
+
+start_client(ClientId, Node, CleanStart) ->
+    Port = tcp_port(Node),
+    {ok, Client} = emqtt:start_link(
+        [
+            {proto_ver, v5},
+            {clientid, ClientId},
+            {port, Port},
+            {clean_start, CleanStart}
+            | [{properties, #{'Session-Expiry-Interval' => 300}} || CleanStart =:= false]
+        ]
+    ),
+    {ok, _} = emqtt:connect(Client),
+    Client.
+
+tcp_port(Node) ->
+    {_Host, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
+    Port.
+
+combine([Entry | Rest]) ->
+    lists:foldl(fun emqx_cth_suite:merge_config/2, Entry, Rest).
+
+conf_mqtt_listener(LPort) when is_integer(LPort) ->
+    fmt("listeners.tcp.clink { bind = ~p }", [LPort]);
+conf_mqtt_listener(_) ->
+    "".
+
+conf_cluster(ClusterName) ->
+    fmt("cluster.name = ~s", [ClusterName]).
+
+conf_log() ->
+    "log.file { enable = true, level = debug, path = node.log, supervisor_reports = progress }".
+
+conf_ds() ->
+    "durable_sessions.enable = true".
+
+fmt(Fmt, Args) ->
+    emqx_utils:format(Fmt, Args).
+
+mk_nodename(BaseName, Idx) ->
+    binary_to_atom(fmt("emqx_clink_~s_~b", [BaseName, Idx])).

+ 2 - 1
apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl

@@ -123,7 +123,8 @@ t_actor_gc(_Config) ->
         [<<"global/#">>, <<"topic/#">>, <<"topic/42/+">>],
         topics_sorted()
     ),
-    _AS13 = apply_operation(heartbeat, AS12, 50_000),
+    _AS13 = apply_operation(heartbeat, AS12, 60_000),
+
     ?assertEqual(
         1,
         emqx_cluster_link_extrouter:actor_gc(env(60_000))