Sfoglia il codice sorgente

feat(cluster): expose the timeout parameter to invite node

JianBo He 2 anni fa
parent
commit
a08f56db42

+ 28 - 10
apps/emqx_management/src/emqx_mgmt_api_cluster.erl

@@ -31,6 +31,8 @@
     connected_replicants/0
 ]).
 
+-define(DEFAULT_INVITE_TIMEOUT, 15000).
+
 namespace() -> "cluster".
 
 api_spec() ->
@@ -77,6 +79,7 @@ schema("/cluster/:node/invite") ->
             desc => ?DESC(invite_node),
             tags => [<<"Cluster">>],
             parameters => [hoconsc:ref(node)],
+            'requestBody' => hoconsc:ref(timeout),
             responses => #{
                 200 => <<"ok">>,
                 400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST'])
@@ -131,6 +134,14 @@ fields(core_replicants) ->
                 #{desc => <<"Core node name">>, example => <<"emqx-core@127.0.0.1">>}
             )},
         {replicant_nodes, ?HOCON(?ARRAY(?REF(replicant_info)))}
+    ];
+fields(timeout) ->
+    [
+        {timeout,
+            ?HOCON(
+                non_neg_integer(),
+                #{desc => <<"Timeout in milliseconds">>, example => <<"15000">>}
+            )}
     ].
 
 validate_node(Node) ->
@@ -188,17 +199,24 @@ running_cores() ->
     Running = emqx:running_nodes(),
     lists:filter(fun(C) -> lists:member(C, Running) end, emqx:cluster_nodes(cores)).
 
-invite_node(put, #{bindings := #{node := Node0}}) ->
+invite_node(put, #{bindings := #{node := Node0}, body := Body}) ->
     Node = ekka_node:parse_name(binary_to_list(Node0)),
-    case emqx_mgmt_cluster_proto_v1:invite_node(Node, node()) of
-        ok ->
-            {200};
-        ignore ->
-            {400, #{code => 'BAD_REQUEST', message => <<"Can't invite self">>}};
-        {badrpc, Error} ->
-            {400, #{code => 'BAD_REQUEST', message => error_message(Error)}};
-        {error, Error} ->
-            {400, #{code => 'BAD_REQUEST', message => error_message(Error)}}
+    case maps:get(<<"timeout">>, Body, ?DEFAULT_INVITE_TIMEOUT) of
+        T when not is_integer(T) ->
+            {400, #{code => 'BAD_REQUEST', message => <<"timeout must be integer">>}};
+        T when T < 5000 ->
+            {400, #{code => 'BAD_REQUEST', message => <<"timeout can't less than 5000ms">>}};
+        Timeout ->
+            case emqx_mgmt_cluster_proto_v3:invite_node(Node, node(), Timeout) of
+                ok ->
+                    {200};
+                ignore ->
+                    {400, #{code => 'BAD_REQUEST', message => <<"Can't invite self">>}};
+                {badrpc, Error} ->
+                    {400, #{code => 'BAD_REQUEST', message => error_message(Error)}};
+                {error, Error} ->
+                    {400, #{code => 'BAD_REQUEST', message => error_message(Error)}}
+            end
     end.
 
 force_leave(delete, #{bindings := #{node := Node0}}) ->

+ 38 - 0
apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl

@@ -0,0 +1,38 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_cluster_proto_v3).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    invite_node/3,
+    connected_replicants/1
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.5.0".
+
+-spec invite_node(node(), node(), timeout()) -> ok | ignore | {error, term()} | emqx_rpc:badrpc().
+invite_node(Node, Self, Timeout) when is_integer(Timeout) ->
+    rpc:call(Node, emqx_mgmt_api_cluster, join, [Self], Timeout).
+
+-spec connected_replicants([node()]) -> emqx_rpc:multicall_result().
+connected_replicants(Nodes) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_cluster, connected_replicants, [], 30_000).

+ 91 - 3
apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl

@@ -35,6 +35,9 @@ end_per_suite(_) ->
 init_per_testcase(TC = t_cluster_topology_api_replicants, Config0) ->
     Config = [{tc_name, TC} | Config0],
     [{cluster, cluster(Config)} | setup(Config)];
+init_per_testcase(TC = t_cluster_invite_api_timeout, Config0) ->
+    Config = [{tc_name, TC} | Config0],
+    [{cluster, cluster(Config)} | setup(Config)];
 init_per_testcase(_TC, Config) ->
     emqx_mgmt_api_test_util:init_suite(?APPS),
     Config.
@@ -42,6 +45,9 @@ init_per_testcase(_TC, Config) ->
 end_per_testcase(t_cluster_topology_api_replicants, Config) ->
     emqx_cth_cluster:stop(?config(cluster, Config)),
     cleanup(Config);
+end_per_testcase(t_cluster_invite_api_timeout, Config) ->
+    emqx_cth_cluster:stop(?config(cluster, Config)),
+    cleanup(Config);
 end_per_testcase(_TC, _Config) ->
     emqx_mgmt_api_test_util:end_suite(?APPS).
 
@@ -77,12 +83,94 @@ t_cluster_topology_api_replicants(Config) ->
      || Resp <- [lists:sort(R) || R <- [Core1Resp, Core2Resp, ReplResp]]
     ].
 
+t_cluster_invite_api_timeout(Config) ->
+    %% assert the cluster is created
+    [Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config),
+    {200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    ?assertMatch(
+        [
+            #{
+                core_node := Core1,
+                replicant_nodes :=
+                    [#{node := Replicant, streams := _}]
+            },
+            #{
+                core_node := Core2,
+                replicant_nodes :=
+                    [#{node := Replicant, streams := _}]
+            }
+        ],
+        lists:sort(Core1Resp)
+    ),
+
+    %% force leave the core2
+    {204} = rpc:call(
+        Core1,
+        emqx_mgmt_api_cluster,
+        force_leave,
+        [delete, #{bindings => #{node => atom_to_binary(Core2)}}]
+    ),
+
+    %% assert the cluster is updated
+    {200, Core1Resp2} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    ?assertMatch(
+        [
+            #{
+                core_node := Core1,
+                replicant_nodes :=
+                    [#{node := Replicant, streams := _}]
+            }
+        ],
+        lists:sort(Core1Resp2)
+    ),
+
+    %% assert timeout parameter checking
+    Invite = fun(Node, Timeout) ->
+        Node1 = atom_to_binary(Node),
+        rpc:call(
+            Core1,
+            emqx_mgmt_api_cluster,
+            invite_node,
+            [put, #{bindings => #{node => Node1}, body => #{<<"timeout">> => Timeout}}]
+        )
+    end,
+    ?assertMatch(
+        {400, #{code := 'BAD_REQUEST', message := <<"timeout must be integer">>}},
+        Invite(Core2, not_a_integer_timeout)
+    ),
+    ?assertMatch(
+        {400, #{code := 'BAD_REQUEST', message := <<"timeout can't less than 5000ms">>}},
+        Invite(Core2, 3000)
+    ),
+
+    %% assert cluster is updated after invite
+    ?assertMatch(
+        {200},
+        Invite(Core2, 15000)
+    ),
+    {200, Core1Resp3} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    ?assertMatch(
+        [
+            #{
+                core_node := Core1,
+                replicant_nodes :=
+                    [#{node := Replicant, streams := _}]
+            },
+            #{
+                core_node := Core2,
+                replicant_nodes := _
+            }
+        ],
+        lists:sort(Core1Resp3)
+    ).
+
 cluster(Config) ->
+    NodeSpec = #{apps => ?APPS},
     Nodes = emqx_cth_cluster:start(
         [
-            {data_backup_core1, #{role => core, apps => ?APPS}},
-            {data_backup_core2, #{role => core, apps => ?APPS}},
-            {data_backup_replicant, #{role => replicant, apps => ?APPS}}
+            {data_backup_core1, NodeSpec#{role => core}},
+            {data_backup_core2, NodeSpec#{role => core}},
+            {data_backup_replicant, NodeSpec#{role => replicant}}
         ],
         #{work_dir => work_dir(Config)}
     ),