Selaa lähdekoodia

Merge pull request #13932 from keynslug/test/clink/simplify-config-ct

test(cluster-link): simplify config test suite
Andrew Mayorov 1 vuosi sitten
vanhempi
commit
5a041d21c2
1 muutettua tiedostoa jossa 94 lisäystä ja 126 poistoa
  1. 94 126
      apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl

+ 94 - 126
apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl

@@ -12,6 +12,9 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-define(BASE_CLINK_MQTT_PORT, 1883).
+-define(BASE_CLUSTER_NODE_PORT, 10000).
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
@@ -22,56 +25,41 @@ end_per_suite(_Config) ->
     ok.
 
 init_per_testcase(TCName, Config) ->
-    emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config).
+    emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, [{tc_name, TCName} | Config]).
 
 end_per_testcase(TCName, Config) ->
-    %% @NOTE: Clean work_dir for this TC to avoid running out of disk space
-    %% causing other test run flaky. Uncomment it if you need to preserve the
-    %% work_dir for troubleshooting
-    t_config_update_ds =:= TCName andalso
-        emqx_cth_suite:clean_work_dir(?config(work_dir, Config)),
     emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).
 
-mk_clusters(NameA, NameB, PortA, PortB, ConfA, ConfB, Config) ->
-    AppsA = [{emqx_conf, ConfA}, emqx_cluster_link],
-    AppsA1 = [
-        {emqx_conf, combine([ConfA, conf_mqtt_listener(PortA)])},
-        emqx_cluster_link
-    ],
-    AppsB = [{emqx_conf, ConfB}, emqx_cluster_link],
-    AppsB1 = [
-        {emqx_conf, combine([ConfB, conf_mqtt_listener(PortB)])},
-        emqx_cluster_link
+mk_cluster(N, ClusterName, BaseSpecs, ExtraConf, CTConfig) when is_list(BaseSpecs) ->
+    Specs = [
+        mk_cluster_nodespec(N, ClusterName, S, I, ExtraConf)
+     || {I, S} <- lists:enumerate(BaseSpecs)
     ],
+    emqx_cth_cluster:mk_nodespecs(
+        Specs,
+        #{work_dir => emqx_cth_suite:work_dir(CTConfig)}
+    );
+mk_cluster(N, ClusterName, Size, ExtraConf, CTConfig) when is_integer(Size) ->
+    mk_cluster(N, ClusterName, lists:duplicate(Size, #{}), ExtraConf, CTConfig).
+
+mk_cluster_nodespec(N, ClusterName, BaseSpec, NodeI, ExtraConf) ->
+    Conf = mk_emqx_conf(N, ClusterName, NodeI, ExtraConf),
+    Spec = BaseSpec#{
+        apps => [{emqx_conf, Conf}, emqx_cluster_link],
+        base_port => N * ?BASE_CLUSTER_NODE_PORT + NodeI * 100
+    },
+    {mk_nodename(ClusterName, NodeI), Spec}.
 
-    NodesA = emqx_cth_cluster:mk_nodespecs(
-        [
-            {mk_nodename(NameA, 1), #{apps => AppsA}},
-            {mk_nodename(NameA, 2), #{apps => AppsA}},
-            {mk_nodename(NameA, 3), #{apps => AppsA1, role => replicant}}
-        ],
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
-    ),
-    NodesB = emqx_cth_cluster:mk_nodespecs(
-        [
-            {mk_nodename(NameB, 1), #{apps => AppsB, base_port => 20100}},
-            {mk_nodename(NameB, 2), #{apps => AppsB1, base_port => 20200}}
-        ],
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
-    ),
-    {NodesA, NodesB}.
+mk_emqx_conf(N, ClusterName, _NodeI = 1, ExtraConf) ->
+    MQTTPort = ?BASE_CLINK_MQTT_PORT + N * 10000,
+    ListenerConf = conf_mqtt_listener(MQTTPort),
+    combine([conf_cluster(ClusterName), ListenerConf, ExtraConf]);
+mk_emqx_conf(_, ClusterName, _NodeI, ExtraConf) ->
+    combine([conf_cluster(ClusterName), ExtraConf]).
 
 t_config_update_cli('init', Config0) ->
-    Config1 =
-        [
-            {name_prefix, ?FUNCTION_NAME}
-            | Config0
-        ],
-    Config2 = t_config_update('init', Config1),
-    [
-        {update_from, cli}
-        | lists:keydelete(update_from, 1, Config2)
-    ];
+    Config = t_config_update('init', Config0),
+    lists:keystore(update_from, 1, Config, {update_from, cli});
 t_config_update_cli('end', Config) ->
     t_config_update('end', Config).
 
@@ -79,26 +67,17 @@ t_config_update_cli(Config) ->
     t_config_update(Config).
 
 t_config_update('init', Config) ->
-    NamePrefix =
-        case ?config(name_prefix, Config) of
-            undefined -> ?FUNCTION_NAME;
-            Name -> Name
-        end,
+    NamePrefix = ?config(tc_name, Config),
     NameA = fmt("~s_~s", [NamePrefix, "a"]),
     NameB = fmt("~s_~s", [NamePrefix, "b"]),
-    LPortA = 31883,
-    LPortB = 41883,
-    ConfA = combine([conf_cluster(NameA), conf_log()]),
-    ConfB = combine([conf_cluster(NameB), conf_log()]),
-    {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
+    NodesA = mk_cluster(1, NameA, 2, conf_log(), Config),
+    NodesB = mk_cluster(2, NameB, 2, conf_log(), Config),
     ClusterA = emqx_cth_cluster:start(NodesA),
     ClusterB = emqx_cth_cluster:start(NodesB),
     ok = snabbkaffe:start_trace(),
     [
         {cluster_a, ClusterA},
         {cluster_b, ClusterB},
-        {lport_a, LPortA},
-        {lport_b, LPortB},
         {name_a, NameA},
         {name_b, NameB},
         {update_from, api}
@@ -110,10 +89,10 @@ t_config_update('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
 
 t_config_update(Config) ->
-    [NodeA1, _, _] = ?config(cluster_a, Config),
-    [NodeB1, _] = ?config(cluster_b, Config),
-    LPortA = ?config(lport_a, Config),
-    LPortB = ?config(lport_b, Config),
+    ClusterA = [NodeA1 | _] = ?config(cluster_a, Config),
+    ClusterB = [NodeB1 | _] = ?config(cluster_b, Config),
+    LPortA = tcp_port(NodeA1, clink),
+    LPortB = tcp_port(NodeB1, clink),
     NameA = ?config(name_a, Config),
     NameB = ?config(name_b, Config),
 
@@ -127,35 +106,29 @@ t_config_update(Config) ->
     LinkConfA = #{
         <<"enable">> => true,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortB]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameB
     },
     LinkConfB = #{
         <<"enable">> => true,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortA]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameA
     },
 
     {ok, SubRef} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
-        %% 5 nodes = 5 actors (durable storage is disabled)
-        5,
+        %% Num nodes = num actors (durable storage is disabled)
+        length(ClusterA) + length(ClusterB),
         30_000
     ),
     ?assertMatch({ok, _}, update(NodeA1, [LinkConfA], Config)),
     ?assertMatch({ok, _}, update(NodeB1, [LinkConfB], Config)),
 
     ?assertMatch(
-        {ok, [
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete}
-        ]},
+        {ok, [#{?snk_kind := clink_route_bootstrap_complete} | _]},
         snabbkaffe:receive_events(SubRef)
     ),
 
@@ -179,8 +152,7 @@ t_config_update(Config) ->
 
     {ok, SubRef1} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
-        %% 3 nodes in cluster a
-        3,
+        length(ClusterA),
         30_000
     ),
 
@@ -189,11 +161,7 @@ t_config_update(Config) ->
     ?assertMatch({ok, _}, update(NodeA1, [LinkConfA1], Config)),
 
     ?assertMatch(
-        {ok, [
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete},
-            #{?snk_kind := clink_route_bootstrap_complete}
-        ]},
+        {ok, [#{?snk_kind := clink_route_bootstrap_complete} | _]},
         snabbkaffe:receive_events(SubRef1)
     ),
 
@@ -237,20 +205,14 @@ t_config_update(Config) ->
 t_config_validations('init', Config) ->
     NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
     NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
-    LPortA = 31883,
-    LPortB = 41883,
-    ConfA = combine([conf_cluster(NameA), conf_log()]),
-    ConfB = combine([conf_cluster(NameB), conf_log()]),
-    %% Single node clusters are enough for a basic validation test
-    {[NodeA, _, _], [NodeB, _]} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
-    ClusterA = emqx_cth_cluster:start([NodeA]),
-    ClusterB = emqx_cth_cluster:start([NodeB]),
+    NodesA = mk_cluster(1, NameA, 1, conf_log(), Config),
+    NodesB = mk_cluster(2, NameB, 1, conf_log(), Config),
+    ClusterA = emqx_cth_cluster:start(NodesA),
+    ClusterB = emqx_cth_cluster:start(NodesB),
     ok = snabbkaffe:start_trace(),
     [
         {cluster_a, ClusterA},
         {cluster_b, ClusterB},
-        {lport_a, LPortA},
-        {lport_b, LPortB},
         {name_a, NameA},
         {name_b, NameB}
         | Config
@@ -262,14 +224,15 @@ t_config_validations('end', Config) ->
 
 t_config_validations(Config) ->
     [NodeA] = ?config(cluster_a, Config),
-    LPortB = ?config(lport_b, Config),
-
+    [NodeB] = ?config(cluster_b, Config),
     NameB = ?config(name_b, Config),
 
+    LPortB = tcp_port(NodeB, clink),
+
     LinkConfA = #{
         <<"enable">> => true,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortB]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameB
     },
@@ -352,21 +315,17 @@ t_config_validations(Config) ->
     ).
 
 t_config_update_ds('init', Config) ->
+    Conf = combine([conf_log(), conf_ds()]),
     NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
     NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
-    LPortA = 31883,
-    LPortB = 41883,
-    ConfA = combine([conf_cluster(NameA), conf_log(), conf_ds()]),
-    ConfB = combine([conf_cluster(NameB), conf_log(), conf_ds()]),
-    {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
+    NodesA = mk_cluster(1, NameA, [#{role => replicant}, #{role => core}], Conf, Config),
+    NodesB = mk_cluster(2, NameB, [#{role => replicant}, #{role => core}], Conf, Config),
     ClusterA = emqx_cth_cluster:start(NodesA),
     ClusterB = emqx_cth_cluster:start(NodesB),
     ok = snabbkaffe:start_trace(),
     [
         {cluster_a, ClusterA},
         {cluster_b, ClusterB},
-        {lport_a, LPortA},
-        {lport_b, LPortB},
         {name_a, NameA},
         {name_b, NameB}
         | Config
@@ -374,15 +333,19 @@ t_config_update_ds('init', Config) ->
 t_config_update_ds('end', Config) ->
     ok = snabbkaffe:stop(),
     ok = emqx_cth_cluster:stop(?config(cluster_a, Config)),
-    ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
+    ok = emqx_cth_cluster:stop(?config(cluster_b, Config)),
+    %% @NOTE: Clean work_dir for this TC to avoid running out of disk space
+    %% causing other test run flaky. Uncomment it if you need to preserve the
+    %% work_dir for troubleshooting
+    emqx_cth_suite:clean_work_dir(?config(work_dir, Config)).
 
 t_config_update_ds(Config) ->
     %% @NOTE: for troubleshooting this TC,
     %% take a look in end_per_testcase/2 to preserve the work dir
-    [NodeA1, _, _] = ?config(cluster_a, Config),
-    [NodeB1, _] = ?config(cluster_b, Config),
-    LPortA = ?config(lport_a, Config),
-    LPortB = ?config(lport_b, Config),
+    [NodeA1 | _] = ?config(cluster_a, Config),
+    [NodeB1 | _] = ?config(cluster_b, Config),
+    LPortA = tcp_port(NodeA1, clink),
+    LPortB = tcp_port(NodeB1, clink),
     NameA = ?config(name_a, Config),
     NameB = ?config(name_b, Config),
 
@@ -394,23 +357,22 @@ t_config_update_ds(Config) ->
     LinkConfA = #{
         <<"enable">> => true,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortB]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameB
     },
     LinkConfB = #{
         <<"enable">> => true,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortA]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameA
     },
 
     {ok, SubRef} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
-        %% 5 nodes = 9 actors (durable storage is enabled,
-        %% 1 replicant node is not doing ds bootstrap)
-        9,
+        %% 2 cores = 4 actors (durable storage enabled) + 2 replicants = 2 more actors
+        6,
         30_000
     ),
     ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])),
@@ -423,8 +385,10 @@ t_config_update_ds(Config) ->
         [#{ps_actor_incarnation := 0}], erpc:call(NodeB1, emqx, get_config, [[cluster, links]])
     ),
 
-    {ok, Events} = snabbkaffe:receive_events(SubRef),
-    ?assertEqual(9, length(Events)),
+    ?assertMatch(
+        {ok, [#{?snk_kind := clink_route_bootstrap_complete} | _]},
+        snabbkaffe:receive_events(SubRef)
+    ),
 
     {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"hello-from-a">>, qos1),
     {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"hello-from-b">>, qos1),
@@ -445,8 +409,8 @@ t_config_update_ds(Config) ->
     ?assertNotReceive({publish, _Message = #{}}),
     {ok, SubRef1} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := clink_route_bootstrap_complete}),
-        %% 3 nodes (1 replicant) in cluster a (5 actors including ds)
-        5,
+        %% 2 nodes (1 replicant) in cluster a (3 actors including ds)
+        3,
         30_000
     ),
 
@@ -455,8 +419,10 @@ t_config_update_ds(Config) ->
     LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]},
     ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])),
 
-    {ok, Events1} = snabbkaffe:receive_events(SubRef1),
-    ?assertEqual(5, length(Events1)),
+    ?assertMatch(
+        {ok, [#{?snk_kind := clink_route_bootstrap_complete} | _]},
+        snabbkaffe:receive_events(SubRef1)
+    ),
 
     %% wait for route sync on ClientA node
     {{ok, _, _}, {ok, _}} = ?wait_async_action(
@@ -490,19 +456,14 @@ t_config_update_ds(Config) ->
 t_misconfigured_links('init', Config) ->
     NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]),
     NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]),
-    LPortA = 31883,
-    LPortB = 41883,
-    ConfA = combine([conf_cluster(NameA), conf_log()]),
-    ConfB = combine([conf_cluster(NameB), conf_log()]),
-    {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config),
+    NodesA = mk_cluster(1, NameA, [#{role => replicant}, #{role => core}], conf_log(), Config),
+    NodesB = mk_cluster(2, NameB, [#{role => core}], conf_log(), Config),
     ClusterA = emqx_cth_cluster:start(NodesA),
     ClusterB = emqx_cth_cluster:start(NodesB),
     ok = snabbkaffe:start_trace(),
     [
         {cluster_a, ClusterA},
         {cluster_b, ClusterB},
-        {lport_a, LPortA},
-        {lport_b, LPortB},
         {name_a, NameA},
         {name_b, NameB}
         | Config
@@ -513,10 +474,10 @@ t_misconfigured_links('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
 
 t_misconfigured_links(Config) ->
-    [NodeA1, _, _] = ?config(cluster_a, Config),
-    [NodeB1, _] = ?config(cluster_b, Config),
-    LPortA = ?config(lport_a, Config),
-    LPortB = ?config(lport_b, Config),
+    [NodeA1 | _] = ?config(cluster_a, Config),
+    [NodeB1 | _] = ?config(cluster_b, Config),
+    LPortA = tcp_port(NodeA1, clink),
+    LPortB = tcp_port(NodeB1, clink),
     NameA = ?config(name_a, Config),
     NameB = ?config(name_b, Config),
 
@@ -529,14 +490,14 @@ t_misconfigured_links(Config) ->
     LinkConfA = #{
         <<"enable">> => true,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortB]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => <<"bad-b-name">>
     },
     LinkConfB = #{
         <<"enable">> => true,
         <<"pool_size">> => 1,
-        <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>,
+        <<"server">> => fmt("localhost:~p", [LPortA]),
         <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
         <<"name">> => NameA
     },
@@ -651,7 +612,14 @@ start_client(ClientId, Node, CleanStart) ->
     Client.
 
 tcp_port(Node) ->
-    {_Host, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
+    tcp_port(Node, default).
+
+tcp_port(Node, Listener) ->
+    get_bind_port(erpc:call(Node, emqx_config, get, [[listeners, tcp, Listener, bind]])).
+
+get_bind_port({_Host, Port}) ->
+    Port;
+get_bind_port(Port) when is_integer(Port) ->
     Port.
 
 combine([Entry | Rest]) ->