Просмотр исходного кода

feat: support invite node in async mananer

JianBo He 2 лет назад
Родитель
Сommit
6ff4c560e4

+ 127 - 0
apps/emqx_management/src/emqx_mgmt_api_cluster.erl

@@ -26,6 +26,8 @@
     cluster_info/2,
     cluster_topology/2,
     invite_node/2,
+    invite_node_async/2,
+    get_invitation_view/2,
     force_leave/2,
     join/1,
     connected_replicants/0
@@ -42,7 +44,9 @@ paths() ->
     [
         "/cluster",
         "/cluster/topology",
+        "/cluster/invitation",
         "/cluster/:node/invite",
+        "/cluster/:node/invite_async",
         "/cluster/:node/force_leave"
     ].
 
@@ -72,6 +76,20 @@ schema("/cluster/topology") ->
             }
         }
     };
+schema("/cluster/invitation") ->
+    #{
+        'operationId' => get_invitation_view,
+        get => #{
+            desc => ?DESC(get_invitation_view),
+            tags => [<<"Cluster">>],
+            responses => #{
+                200 => ?HOCON(
+                    ?REF(invitation_view),
+                    #{desc => <<"Get invitation progress created by async operation">>}
+                )
+            }
+        }
+    };
 schema("/cluster/:node/invite") ->
     #{
         'operationId' => invite_node,
@@ -86,6 +104,19 @@ schema("/cluster/:node/invite") ->
             }
         }
     };
+schema("/cluster/:node/invite_async") ->
+    #{
+        'operationId' => invite_node_async,
+        put => #{
+            desc => ?DESC(invite_node_async),
+            tags => [<<"Cluster">>],
+            parameters => [hoconsc:ref(node)],
+            responses => #{
+                200 => <<"ok">>,
+                400 => emqx_dashboard_swagger:error_codes(['BAD_REQUEST'])
+            }
+        }
+    };
 schema("/cluster/:node/force_leave") ->
     #{
         'operationId' => force_leave,
@@ -142,6 +173,61 @@ fields(timeout) ->
                 non_neg_integer(),
                 #{desc => <<"Timeout in milliseconds">>, example => <<"15000">>}
             )}
+    ];
+fields(invitation_view) ->
+    [
+        {succeed,
+            ?HOCON(
+                ?ARRAY(?REF(node_invitation_succeed)),
+                #{desc => <<"A list of information about nodes that were successfully invited">>}
+            )},
+        {in_progress,
+            ?HOCON(
+                ?ARRAY(?REF(node_invitation_in_progress)),
+                #{desc => <<"A list of information about nodes that are processing invitations">>}
+            )},
+        {failed,
+            ?HOCON(
+                ?ARRAY(?REF(node_invitation_failed)),
+                #{desc => <<"A list of information about nodes that failed to be invited">>}
+            )}
+    ];
+fields(node_invitation_failed) ->
+    fields(node_invitation_succeed) ++
+        [
+            {reason,
+                ?HOCON(
+                    binary(),
+                    #{desc => <<"Failed reason">>, example => <<"Bad RPC to target node">>}
+                )}
+        ];
+fields(node_invitation_succeed) ->
+    fields(node_invitation_in_progress) ++
+        [
+            {finished_at,
+                ?HOCON(
+                    emqx_utils_calendar:epoch_millisecond(),
+                    #{
+                        desc => <<"The end time of the invitation task, in millisecond">>,
+                        example => <<"1705044829915">>
+                    }
+                )}
+        ];
+fields(node_invitation_in_progress) ->
+    [
+        {node,
+            ?HOCON(
+                binary(),
+                #{desc => <<"Node name">>, example => <<"emqx2@127.0.0.1">>}
+            )},
+        {started_at,
+            ?HOCON(
+                emqx_utils_calendar:epoch_millisecond(),
+                #{
+                    desc => <<"The start time of the invitation task, in millisecond">>,
+                    example => <<"1705044829915">>
+                }
+            )}
     ].
 
 validate_node(Node) ->
@@ -219,6 +305,23 @@ invite_node(put, #{bindings := #{node := Node0}, body := Body}) ->
             end
     end.
 
+invite_node_async(put, #{bindings := #{node := Node0}}) ->
+    Node = ekka_node:parse_name(binary_to_list(Node0)),
+    case emqx_mgmt_cluster:invite_async(Node) of
+        ok ->
+            {200};
+        ignore ->
+            {400, #{code => 'BAD_REQUEST', message => <<"Can't invite self">>}};
+        {error, {already_started, _Pid}} ->
+            {400, #{
+                code => 'BAD_REQUEST',
+                message => <<"The invitation task already created for this node">>
+            }}
+    end.
+
+get_invitation_view(get, _) ->
+    {200, format_invitation_view(emqx_mgmt_cluster:invitation_view())}.
+
 force_leave(delete, #{bindings := #{node := Node0}}) ->
     Node = ekka_node:parse_name(binary_to_list(Node0)),
     case ekka:force_leave(Node) of
@@ -240,3 +343,27 @@ connected_replicants() ->
 
 error_message(Msg) ->
     iolist_to_binary(io_lib:format("~p", [Msg])).
+
+format_invitation_view(#{
+    succeed := Succeed,
+    in_progress := InProgress,
+    failed := Failed
+}) ->
+    #{
+        succeed => format_invitation_info(Succeed),
+        in_progress => format_invitation_info(InProgress),
+        failed => format_invitation_info(Failed)
+    }.
+
+format_invitation_info(L) when is_list(L) ->
+    lists:map(
+        fun(Info) ->
+            Info1 = emqx_utils_maps:update_if_present(
+                started_at, fun emqx_utils_calendar:epoch_to_rfc3339/1, Info
+            ),
+            emqx_utils_maps:update_if_present(
+                finished_at, fun emqx_utils_calendar:epoch_to_rfc3339/1, Info1
+            )
+        end,
+        L
+    ).

+ 196 - 0
apps/emqx_management/src/emqx_mgmt_cluster.erl

@@ -0,0 +1,196 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_cluster).
+
+-behaviour(gen_server).
+
+%% APIs
+-export([start_link/0]).
+
+-export([invite_async/1, invitation_view/0]).
+
+%% gen_server callbacks
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+-spec invite_async(atom()) -> ok | ignore | {badrpc, any()}.
+invite_async(Node) ->
+    JoinTo = node(),
+    case Node =/= JoinTo of
+        true ->
+            gen_server:call(?MODULE, {invite_async, Node, JoinTo});
+        false ->
+            ignore
+    end.
+
+-spec invitation_view() -> map().
+invitation_view() ->
+    gen_server:call(?MODULE, invitation_view).
+
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    process_flag(trap_exit, true),
+    {ok, #{}}.
+
+handle_call({invite_async, Node, JoinTo}, _From, State) ->
+    case maps:get(Node, State, undefined) of
+        undefined ->
+            Caller = self(),
+            Task = spawn_link_invite_worker(Node, JoinTo, Caller),
+            {reply, ok, State#{Node => Task}};
+        WorkerPid ->
+            {reply, {error, {already_started, WorkerPid}}, State}
+    end;
+handle_call(invitation_view, _From, State) ->
+    {reply, state_to_invitation_view(State), State};
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({task_done, _WorkerPid, Node, Result}, State) ->
+    case maps:take(Node, State) of
+        {Task, State1} ->
+            History = maps:get(history, State1, #{}),
+            Task1 = Task#{
+                result => Result,
+                finished_at => erlang:system_time(millisecond)
+            },
+            {noreply, State1#{history => History#{Node => Task1}}};
+        error ->
+            {noreply, State}
+    end;
+handle_info({'EXIT', WorkerPid, Reason}, State) ->
+    case take_node_name_via_worker_pid(WorkerPid, State) of
+        {key_value, Node, Task, State1} ->
+            History = maps:get(history, State1, #{}),
+            Task1 = Task#{
+                result => {error, Reason},
+                finished_at => erlang:system_time(millisecond)
+            },
+            {noreply, State1#{history => History#{Node => Task1}}};
+        error ->
+            {noreply, State}
+    end;
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+spawn_link_invite_worker(Node, JoinTo, Caller) ->
+    Pid = erlang:spawn_link(
+        fun() ->
+            Result =
+                case emqx_mgmt_cluster_proto_v3:invite_node(Node, JoinTo, infinity) of
+                    ok ->
+                        ok;
+                    {error, {already_in_cluster, _Node}} ->
+                        ok;
+                    {error, _} = E ->
+                        E;
+                    {badrpc, Reason} ->
+                        {error, {badrpc, Reason}}
+                end,
+            Caller ! {task_done, self(), Node, Result}
+        end
+    ),
+    #{worker => Pid, started_at => erlang:system_time(millisecond)}.
+
+take_node_name_via_worker_pid(WorkerPid, Map) when is_map(Map) ->
+    Key = find_node_name_via_worker_pid(WorkerPid, maps:next(maps:iterator(Map))),
+    case maps:take(Key, Map) of
+        error ->
+            error;
+        {Vaule, Map1} ->
+            {key_value, Key, Vaule, Map1}
+    end.
+
+find_node_name_via_worker_pid(_WorkerPid, none) ->
+    error;
+find_node_name_via_worker_pid(WorkerPid, {Key, Task, I}) ->
+    case maps:get(worker, Task, undefined) of
+        WorkerPid ->
+            Key;
+        _ ->
+            find_node_name_via_worker_pid(WorkerPid, maps:next(I))
+    end.
+
+state_to_invitation_view(State) ->
+    History = maps:get(history, State, #{}),
+    {Succ, Failed} = lists:foldl(
+        fun({Node, Task}, {SuccAcc, FailedAcc}) ->
+            #{
+                started_at := StartedAt,
+                finished_at := FinishedAt,
+                result := Result
+            } = Task,
+            Ret = #{node => Node, started_at => StartedAt, finished_at => FinishedAt},
+            case is_succeed_result(Result) of
+                true ->
+                    {[Ret | SuccAcc], FailedAcc};
+                false ->
+                    {SuccAcc, [Ret#{reason => Result} | FailedAcc]}
+            end
+        end,
+        {[], []},
+        maps:to_list(History)
+    ),
+
+    InPro = maps:fold(
+        fun(Node, _Task = #{started_at := StartedAt}, Acc) ->
+            [#{node => Node, started_at => StartedAt} | Acc]
+        end,
+        [],
+        maps:without([history], State)
+    ),
+    #{succeed => Succ, in_progress => InPro, failed => Failed}.
+
+is_succeed_result(Result) ->
+    case Result of
+        ok ->
+            true;
+        {error, {already_in_cluster, _Node}} ->
+            true;
+        _ ->
+            false
+    end.

+ 2 - 1
apps/emqx_management/src/emqx_mgmt_sup.erl

@@ -33,7 +33,8 @@ init([]) ->
             _ ->
                 []
         end,
-    {ok, {{one_for_one, 1, 5}, Workers}}.
+    Cluster = child_spec(emqx_mgmt_cluster, 5000, worker),
+    {ok, {{one_for_one, 1, 5}, [Cluster | Workers]}}.
 
 child_spec(Mod, Shutdown, Type) ->
     #{

+ 1 - 1
apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v3.erl

@@ -30,7 +30,7 @@ introduced_in() ->
     "5.5.0".
 
 -spec invite_node(node(), node(), timeout()) -> ok | ignore | {error, term()} | emqx_rpc:badrpc().
-invite_node(Node, Self, Timeout) when is_integer(Timeout) ->
+invite_node(Node, Self, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
     rpc:call(Node, emqx_mgmt_api_cluster, join, [Self], Timeout).
 
 -spec connected_replicants([node()]) -> emqx_rpc:multicall_result().

+ 133 - 1
apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -38,6 +38,9 @@ init_per_testcase(TC = t_cluster_topology_api_replicants, Config0) ->
 init_per_testcase(TC = t_cluster_invite_api_timeout, Config0) ->
     Config = [{tc_name, TC} | Config0],
     [{cluster, cluster(Config)} | setup(Config)];
+init_per_testcase(TC = t_cluster_invite_async, Config0) ->
+    Config = [{tc_name, TC} | Config0],
+    [{cluster, cluster(Config)} | setup(Config)];
 init_per_testcase(_TC, Config) ->
     emqx_mgmt_api_test_util:init_suite(?APPS),
     Config.
@@ -48,6 +51,9 @@ end_per_testcase(t_cluster_topology_api_replicants, Config) ->
 end_per_testcase(t_cluster_invite_api_timeout, Config) ->
     emqx_cth_cluster:stop(?config(cluster, Config)),
     cleanup(Config);
+end_per_testcase(t_cluster_invite_async, Config) ->
+    emqx_cth_cluster:stop(?config(cluster, Config)),
+    cleanup(Config);
 end_per_testcase(_TC, _Config) ->
     emqx_mgmt_api_test_util:end_suite(?APPS).
 
@@ -164,6 +170,98 @@ t_cluster_invite_api_timeout(Config) ->
         lists:sort(Core1Resp3)
     ).
 
+t_cluster_invite_async(Config) ->
+    %% assert the cluster is created
+    [Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config),
+    {200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    ?assertMatch(
+        [
+            #{
+                core_node := Core1,
+                replicant_nodes :=
+                    [#{node := Replicant, streams := _}]
+            },
+            #{
+                core_node := Core2,
+                replicant_nodes :=
+                    [#{node := Replicant, streams := _}]
+            }
+        ],
+        lists:sort(Core1Resp)
+    ),
+
+    %% force leave the core2 and replicant
+    {204} = rpc:call(
+        Core1,
+        emqx_mgmt_api_cluster,
+        force_leave,
+        [delete, #{bindings => #{node => atom_to_binary(Core2)}}]
+    ),
+    %% assert the cluster is updated
+    {200, Core1Resp2} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    ?assertMatch(
+        [
+            #{
+                core_node := Core1,
+                replicant_nodes := [_]
+            }
+        ],
+        lists:sort(Core1Resp2)
+    ),
+
+    Invite = fun(Node) ->
+        Node1 = atom_to_binary(Node),
+        rpc:call(
+            Core1,
+            emqx_mgmt_api_cluster,
+            invite_node_async,
+            [put, #{bindings => #{node => Node1}}]
+        )
+    end,
+
+    %% parameter checking
+    ?assertMatch(
+        {400, #{code := 'BAD_REQUEST', message := <<"Can't invite self">>}},
+        Invite(Core1)
+    ),
+    ?assertMatch(
+        {200},
+        Invite(Core2)
+    ),
+    %% already invited
+    ?assertMatch(
+        {400, #{
+            code := 'BAD_REQUEST',
+            message := <<"The invitation task already created for this node">>
+        }},
+        Invite(Core2)
+    ),
+
+    %% assert: core2 is in_progress status
+    ?assertMatch(
+        {200, #{in_progress := [#{node := Core2}]}},
+        rpc:call(Core1, emqx_mgmt_api_cluster, get_invitation_view, [get, #{}])
+    ),
+
+    %% waiting the async invitation_succeed
+    ?assertMatch({succeed, _}, waiting_the_async_invitation_succeed(Core1, Core2)),
+
+    {200, Core1Resp3} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    ?assertMatch(
+        [
+            #{
+                core_node := Core1,
+                replicant_nodes :=
+                    [#{node := Replicant, streams := _}]
+            },
+            #{
+                core_node := Core2,
+                replicant_nodes := _
+            }
+        ],
+        lists:sort(Core1Resp3)
+    ).
+
 cluster(Config) ->
     NodeSpec = #{apps => ?APPS},
     Nodes = emqx_cth_cluster:start(
@@ -186,3 +284,37 @@ cleanup(Config) ->
 
 work_dir(Config) ->
     filename:join(?config(priv_dir, Config), ?config(tc_name, Config)).
+
+waiting_the_async_invitation_succeed(Node, TargetNode) ->
+    waiting_the_async_invitation_succeed(Node, TargetNode, 100).
+
+waiting_the_async_invitation_succeed(_Node, _TargetNode, 0) ->
+    error(timeout);
+waiting_the_async_invitation_succeed(Node, TargetNode, N) ->
+    {200, #{
+        in_progress := InProgress,
+        succeed := Succeed,
+        failed := Failed
+    }} = rpc:call(Node, emqx_mgmt_api_cluster, get_invitation_view, [get, #{}]),
+    case find_node_info_list(TargetNode, InProgress) of
+        error ->
+            case find_node_info_list(TargetNode, Succeed) of
+                error ->
+                    case find_node_info_list(TargetNode, Failed) of
+                        error -> error;
+                        Info1 -> {failed, Info1}
+                    end;
+                Info2 ->
+                    {succeed, Info2}
+            end;
+        _Info ->
+            timer:sleep(1000),
+            waiting_the_async_invitation_succeed(Node, TargetNode, N - 1)
+    end.
+
+find_node_info_list(Node, List) ->
+    L = lists:filter(fun(#{node := N}) -> N =:= Node end, List),
+    case L of
+        [] -> error;
+        [Info] -> Info
+    end.

+ 5 - 0
rel/i18n/emqx_mgmt_api_cluster.hocon

@@ -15,6 +15,11 @@ invite_node.desc:
 invite_node.label:
 """Invite node to cluster"""
 
+invite_node_async.desc:
+"""Asynchronously invite the target node to join the cluster"""
+invite_node_async.label:
+"""Asynchronously invite the target node to join the cluster"""
+
 force_remove_node.desc:
 """Force leave node from cluster"""
 force_remove_node.label: