Explorar o código

perf(bridge-api): ask bridge listings in parallel

Also rename response formatting functions to better clarify their
purpose.
Andrew Mayorov %!s(int64=2) %!d(string=hai) anos
pai
achega
cad6492c99

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -4,6 +4,7 @@
 {emqx_authz,1}.
 {emqx_bridge,1}.
 {emqx_bridge,2}.
+{emqx_bridge,3}.
 {emqx_broker,1}.
 {emqx_cm,1}.
 {emqx_conf,1}.

+ 19 - 15
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -483,11 +483,18 @@ schema("/bridges_probe") ->
             end
     end;
 '/bridges'(get, _Params) ->
-    {200,
-        zip_bridges([
-            [format_resp(Data, Node) || Data <- emqx_bridge_proto_v1:list_bridges(Node)]
-         || Node <- mria:running_nodes()
-        ])}.
+    Nodes = mria:running_nodes(),
+    NodeReplies = emqx_bridge_proto_v3:list_bridges_on_nodes(Nodes),
+    case is_ok(NodeReplies) of
+        {ok, NodeBridges} ->
+            AllBridges = [
+                format_resource(Data, Node)
+             || {Node, Bridges} <- lists:zip(Nodes, NodeBridges), Data <- Bridges
+            ],
+            {200, zip_bridges([AllBridges])};
+        {error, Reason} ->
+            {500, error_msg('INTERNAL_ERROR', Reason)}
+    end.
 
 '/bridges/:id'(get, #{bindings := #{id := Id}}) ->
     ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
@@ -591,7 +598,7 @@ lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) ->
 
 do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
     Nodes = mria:running_nodes(),
-    case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
+    case is_ok(emqx_bridge_proto_v3:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
         {ok, [{ok, _} | _] = Results} ->
             {SuccCode, FormatFun([R || {ok, R} <- Results])};
         {ok, [{error, not_found} | _]} ->
@@ -602,7 +609,7 @@ do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
 
 lookup_from_local_node(BridgeType, BridgeName) ->
     case emqx_bridge:lookup(BridgeType, BridgeName) of
-        {ok, Res} -> {ok, format_resp(Res)};
+        {ok, Res} -> {ok, format_resource(Res, node())};
         Error -> Error
     end.
 
@@ -802,10 +809,7 @@ aggregate_metrics(
 aggregate_metrics(#{}, Metrics) ->
     Metrics.
 
-format_resp(Data) ->
-    format_resp(Data, node()).
-
-format_resp(
+format_resource(
     #{
         type := Type,
         name := BridgeName,
@@ -974,7 +978,7 @@ do_bpapi_call(Node, Call, Args) ->
 do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
     case lists:member(SupportedVersion, supported_versions(Call)) of
         true ->
-            apply(emqx_bridge_proto_v2, Call, Args);
+            apply(emqx_bridge_proto_v3, Call, Args);
         false ->
             {error, not_implemented}
     end.
@@ -984,9 +988,9 @@ maybe_unwrap({error, not_implemented}) ->
 maybe_unwrap(RpcMulticallResult) ->
     emqx_rpc:unwrap_erpc(RpcMulticallResult).
 
-supported_versions(start_bridge_to_node) -> [2];
-supported_versions(start_bridges_to_all_nodes) -> [2];
-supported_versions(_Call) -> [1, 2].
+supported_versions(start_bridge_to_node) -> [2, 3];
+supported_versions(start_bridges_to_all_nodes) -> [2, 3];
+supported_versions(_Call) -> [1, 2, 3].
 
 to_hr_reason(nxdomain) ->
     <<"Host not found">>;

+ 4 - 0
apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl

@@ -20,6 +20,7 @@
 
 -export([
     introduced_in/0,
+    deprecated_since/0,
 
     list_bridges/1,
     restart_bridge_to_node/3,
@@ -38,6 +39,9 @@
 introduced_in() ->
     "5.0.17".
 
+deprecated_since() ->
+    "5.0.21".
+
 -spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
 list_bridges(Node) ->
     rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).

+ 128 - 0
apps/emqx_bridge/src/proto/emqx_bridge_proto_v3.erl

@@ -0,0 +1,128 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_proto_v3).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    list_bridges/1,
+    list_bridges_on_nodes/1,
+    restart_bridge_to_node/3,
+    start_bridge_to_node/3,
+    stop_bridge_to_node/3,
+    lookup_from_all_nodes/3,
+    restart_bridges_to_all_nodes/3,
+    start_bridges_to_all_nodes/3,
+    stop_bridges_to_all_nodes/3
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-define(TIMEOUT, 15000).
+
+introduced_in() ->
+    "5.0.21".
+
+-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
+list_bridges(Node) ->
+    rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
+
+-spec list_bridges_on_nodes([node()]) ->
+    emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
+list_bridges_on_nodes(Nodes) ->
+    erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT).
+
+-type key() :: atom() | binary() | [byte()].
+
+-spec restart_bridge_to_node(node(), key(), key()) ->
+    term().
+restart_bridge_to_node(Node, BridgeType, BridgeName) ->
+    rpc:call(
+        Node,
+        emqx_bridge_resource,
+        restart,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec start_bridge_to_node(node(), key(), key()) ->
+    term().
+start_bridge_to_node(Node, BridgeType, BridgeName) ->
+    rpc:call(
+        Node,
+        emqx_bridge_resource,
+        start,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec stop_bridge_to_node(node(), key(), key()) ->
+    term().
+stop_bridge_to_node(Node, BridgeType, BridgeName) ->
+    rpc:call(
+        Node,
+        emqx_bridge_resource,
+        stop,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall().
+restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_resource,
+        restart,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec start_bridges_to_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall().
+start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_resource,
+        start,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall().
+stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_resource,
+        stop,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec lookup_from_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall().
+lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_api,
+        lookup_from_local_node,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).

+ 2 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -112,6 +112,8 @@
 
 -export([apply_reply_fun/2]).
 
+-export_type([resource_data/0]).
+
 -optional_callbacks([
     on_query/3,
     on_batch_query/3,