فهرست منبع

feat(emqx_management): add /cluster/topology HTTP API endpoint

The endpoint shows Mria RLOG cluster topology info:
connections between core and replicant nodes.

Closes: EMQX-10392
Serge Tupchii 2 سال پیش
والد
کامیت
19de10be7c

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -32,6 +32,7 @@
 {emqx_mgmt_api_plugins,1}.
 {emqx_mgmt_api_plugins,2}.
 {emqx_mgmt_cluster,1}.
+{emqx_mgmt_cluster,2}.
 {emqx_mgmt_trace,1}.
 {emqx_mgmt_trace,2}.
 {emqx_node_rebalance,1}.

+ 87 - 1
apps/emqx_management/src/emqx_mgmt_api_cluster.erl

@@ -19,9 +19,17 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -export([api_spec/0, fields/1, paths/0, schema/1, namespace/0]).
--export([cluster_info/2, invite_node/2, force_leave/2, join/1]).
+-export([
+    cluster_info/2,
+    cluster_topology/2,
+    invite_node/2,
+    force_leave/2,
+    join/1,
+    connected_replicants/0
+]).
 
 namespace() -> "cluster".
 
@@ -31,6 +39,7 @@ api_spec() ->
 paths() ->
     [
         "/cluster",
+        "/cluster/topology",
         "/cluster/:node/invite",
         "/cluster/:node/force_leave"
     ].
@@ -50,6 +59,17 @@ schema("/cluster") ->
             }
         }
     };
+schema("/cluster/topology") ->
+    #{
+        'operationId' => cluster_topology,
+        get => #{
+            desc => ?DESC(get_cluster_topology),
+            tags => [<<"Cluster">>],
+            responses => #{
+                200 => ?HOCON(?ARRAY(?REF(core_replicants)), #{desc => <<"Cluster topology">>})
+            }
+        }
+    };
 schema("/cluster/:node/invite") ->
     #{
         'operationId' => invite_node,
@@ -89,6 +109,28 @@ fields(node) ->
                     validator => fun validate_node/1
                 }
             )}
+    ];
+fields(replicant_info) ->
+    [
+        {node,
+            ?HOCON(
+                atom(),
+                #{desc => <<"Replicant node name">>, example => <<"emqx-replicant@127.0.0.2">>}
+            )},
+        {streams,
+            ?HOCON(
+                non_neg_integer(),
+                #{desc => <<"The number of RLOG (replicated log) streams">>, example => <<"10">>}
+            )}
+    ];
+fields(core_replicants) ->
+    [
+        {core_node,
+            ?HOCON(
+                atom(),
+                #{desc => <<"Core node name">>, example => <<"emqx-core@127.0.0.1">>}
+            )},
+        {replicant_nodes, ?HOCON(?ARRAY(?REF(replicant_info)))}
     ].
 
 validate_node(Node) ->
@@ -106,6 +148,46 @@ cluster_info(get, _) ->
     },
     {200, Info}.
 
+cluster_topology(get, _) ->
+    RunningCores = running_cores(),
+    {Replicants, BadNodes} = emqx_mgmt_cluster_proto_v2:connected_replicants(RunningCores),
+    CoreReplicants = lists:zip(
+        lists:filter(
+            fun(N) -> not lists:member(N, BadNodes) end,
+            RunningCores
+        ),
+        Replicants
+    ),
+    Topology = lists:map(
+        fun
+            ({Core, {badrpc, Reason}}) ->
+                ?SLOG(error, #{
+                    msg => "failed_to_get_replicant_nodes",
+                    core_node => Core,
+                    reason => Reason
+                }),
+                #{core_node => Core, replicant_nodes => []};
+            ({Core, Repls}) ->
+                #{core_node => Core, replicant_nodes => format_replicants(Repls)}
+        end,
+        CoreReplicants
+    ),
+    BadNodes =/= [] andalso ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes}),
+    {200, Topology}.
+
+format_replicants(Replicants) ->
+    maps:fold(
+        fun(K, V, Acc) ->
+            [#{node => K, streams => length(V)} | Acc]
+        end,
+        [],
+        maps:groups_from_list(fun({_, N, _}) -> N end, Replicants)
+    ).
+
+running_cores() ->
+    Running = emqx:running_nodes(),
+    lists:filter(fun(C) -> lists:member(C, Running) end, emqx:cluster_nodes(cores)).
+
 invite_node(put, #{bindings := #{node := Node0}}) ->
     Node = ekka_node:parse_name(binary_to_list(Node0)),
     case emqx_mgmt_cluster_proto_v1:invite_node(Node, node()) of
@@ -134,5 +216,9 @@ force_leave(delete, #{bindings := #{node := Node0}}) ->
 join(Node) ->
     ekka:join(Node).
 
+-spec connected_replicants() -> [{atom(), node(), pid()}].
+connected_replicants() ->
+    mria_status:agents().
+
 error_message(Msg) ->
     iolist_to_binary(io_lib:format("~p", [Msg])).

+ 38 - 0
apps/emqx_management/src/proto/emqx_mgmt_cluster_proto_v2.erl

@@ -0,0 +1,38 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_cluster_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    invite_node/2,
+    connected_replicants/1
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.1.1".
+
+-spec invite_node(node(), node()) -> ok | ignore | {error, term()} | emqx_rpc:badrpc().
+invite_node(Node, Self) ->
+    rpc:call(Node, emqx_mgmt_api_cluster, join, [Self], 5000).
+
+-spec connected_replicants([node()]) -> emqx_rpc:multicall_result().
+connected_replicants(Nodes) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_cluster, connected_replicants, [], 30_000).

+ 102 - 0
apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl

@@ -0,0 +1,102 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_api_cluster_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(APPS, [emqx_conf, emqx_management]).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_) ->
+    ok.
+
+init_per_testcase(TC = t_cluster_topology_api_replicants, Config0) ->
+    Config = [{tc_name, TC} | Config0],
+    [{cluster, cluster(Config)} | setup(Config)];
+init_per_testcase(_TC, Config) ->
+    emqx_mgmt_api_test_util:init_suite(?APPS),
+    Config.
+
+end_per_testcase(t_cluster_topology_api_replicants, Config) ->
+    emqx_cth_cluster:stop(?config(cluster, Config)),
+    cleanup(Config);
+end_per_testcase(_TC, _Config) ->
+    emqx_mgmt_api_test_util:end_suite(?APPS).
+
+t_cluster_topology_api_empty_resp(_) ->
+    ClusterTopologyPath = emqx_mgmt_api_test_util:api_path(["cluster", "topology"]),
+    {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, ClusterTopologyPath),
+    ?assertEqual(
+        [#{<<"core_node">> => atom_to_binary(node()), <<"replicant_nodes">> => []}],
+        emqx_utils_json:decode(Resp, [return_maps])
+    ).
+
+t_cluster_topology_api_replicants(Config) ->
+    %% some time to stabilize
+    timer:sleep(3000),
+    [Core1, Core2, Replicant] = _NodesList = ?config(cluster, Config),
+    {200, Core1Resp} = rpc:call(Core1, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    {200, Core2Resp} = rpc:call(Core2, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    {200, ReplResp} = rpc:call(Replicant, emqx_mgmt_api_cluster, cluster_topology, [get, #{}]),
+    [
+        ?assertMatch(
+            [
+                #{
+                    core_node := Core1,
+                    replicant_nodes :=
+                        [#{node := Replicant, streams := _}]
+                },
+                #{
+                    core_node := Core2,
+                    replicant_nodes :=
+                        [#{node := Replicant, streams := _}]
+                }
+            ],
+            Resp
+        )
+     || Resp <- [lists:sort(R) || R <- [Core1Resp, Core2Resp, ReplResp]]
+    ].
+
+cluster(Config) ->
+    Nodes = emqx_cth_cluster:start(
+        [
+            {data_backup_core1, #{role => core, apps => ?APPS}},
+            {data_backup_core2, #{role => core, apps => ?APPS}},
+            {data_backup_replicant, #{role => replicant, apps => ?APPS}}
+        ],
+        #{work_dir => work_dir(Config)}
+    ),
+    Nodes.
+
+setup(Config) ->
+    WorkDir = filename:join(work_dir(Config), local),
+    Started = emqx_cth_suite:start(?APPS, #{work_dir => WorkDir}),
+    [{suite_apps, Started} | Config].
+
+cleanup(Config) ->
+    emqx_cth_suite:stop(?config(suite_apps, Config)).
+
+work_dir(Config) ->
+    filename:join(?config(priv_dir, Config), ?config(tc_name, Config)).

+ 3 - 0
changes/ce/feat-11251.en.md

@@ -0,0 +1,3 @@
+Add `/cluster/topology` HTTP API endpoint
+
+`GET` request to the endpoint returns the cluster topology: connections between RLOG core and replicant nodes.

+ 5 - 0
rel/i18n/emqx_mgmt_api_cluster.hocon

@@ -5,6 +5,11 @@ get_cluster_info.desc:
 get_cluster_info.label:
 """Get cluster info"""
 
+get_cluster_topology.desc:
+"""Get RLOG cluster topology: connections between core and replicant nodes."""
+get_cluster_topology.label:
+"""Get cluster topology"""
+
 invite_node.desc:
 """Invite node to cluster"""
 invite_node.label: