Browse Source

refactor(action_api): prepare for `/sources` HTTP API

Thales Macedo Garitezi 2 years atrás
parent
commit
6511693b2e

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -8,6 +8,7 @@
 {emqx_bridge,3}.
 {emqx_bridge,4}.
 {emqx_bridge,5}.
+{emqx_bridge,6}.
 {emqx_broker,1}.
 {emqx_cm,1}.
 {emqx_cm,2}.

+ 17 - 7
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -49,6 +49,11 @@
 -export([lookup_from_local_node/2]).
 -export([get_metrics_from_local_node/2]).
 
+%% only for testting/mocking
+-export([supported_versions/1]).
+
+-define(BPAPI_NAME, emqx_bridge).
+
 -define(BRIDGE_NOT_ENABLED,
     ?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>)
 ).
@@ -1102,18 +1107,18 @@ maybe_try_restart(_, _, _) ->
 
 do_bpapi_call(all, Call, Args) ->
     maybe_unwrap(
-        do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args)
+        do_bpapi_call_vsn(emqx_bpapi:supported_version(?BPAPI_NAME), Call, Args)
     );
 do_bpapi_call(Node, Call, Args) ->
     case lists:member(Node, mria:running_nodes()) of
         true ->
-            do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args);
+            do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, ?BPAPI_NAME), Call, Args);
         false ->
             {error, {node_not_found, Node}}
     end.
 
 do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
-    case lists:member(SupportedVersion, supported_versions(Call)) of
+    case lists:member(SupportedVersion, ?MODULE:supported_versions(Call)) of
         true ->
             apply(emqx_bridge_proto_v4, Call, Args);
         false ->
@@ -1125,10 +1130,15 @@ maybe_unwrap({error, not_implemented}) ->
 maybe_unwrap(RpcMulticallResult) ->
     emqx_rpc:unwrap_erpc(RpcMulticallResult).
 
-supported_versions(start_bridge_to_node) -> [2, 3, 4, 5];
-supported_versions(start_bridges_to_all_nodes) -> [2, 3, 4, 5];
-supported_versions(get_metrics_from_all_nodes) -> [4, 5];
-supported_versions(_Call) -> [1, 2, 3, 4, 5].
+supported_versions(start_bridge_to_node) -> bpapi_version_range(2, latest);
+supported_versions(start_bridges_to_all_nodes) -> bpapi_version_range(2, latest);
+supported_versions(get_metrics_from_all_nodes) -> bpapi_version_range(4, latest);
+supported_versions(_Call) -> bpapi_version_range(1, latest).
+
+%% [From, To] (inclusive on both ends)
+bpapi_version_range(From, latest) ->
+    ThisNodeVsn = emqx_bpapi:supported_version(node(), ?BPAPI_NAME),
+    lists:seq(From, ThisNodeVsn).
 
 redact(Term) ->
     emqx_utils:redact(Term).

+ 34 - 8
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -43,6 +43,7 @@
     lookup/2,
     lookup/3,
     create/3,
+    create/4,
     %% The remove/2 function is only for internal use as it may create
     %% rules with broken dependencies
     remove/2,
@@ -50,7 +51,8 @@
     %% The following is the remove function that is called by the HTTP API
     %% It also checks for rule action dependencies and optionally removes
     %% them
-    check_deps_and_remove/3
+    check_deps_and_remove/3,
+    check_deps_and_remove/4
 ]).
 
 %% Operations
@@ -62,9 +64,11 @@
     send_message/4,
     query/4,
     start/2,
+    start/3,
     reset_metrics/2,
     reset_metrics/3,
     create_dry_run/2,
+    create_dry_run/3,
     get_metrics/2,
     get_metrics/3
 ]).
@@ -150,6 +154,10 @@
 -type bridge_v2_type() :: binary() | atom() | [byte()].
 -type bridge_v2_name() :: binary() | atom() | [byte()].
 
+-type root_cfg_key() :: ?ROOT_KEY_ACTIONS | ?ROOT_KEY_SOURCES.
+
+-export_type([root_cfg_key/0]).
+
 %%====================================================================
 
 %%====================================================================
@@ -212,7 +220,7 @@ unload_bridges(ConfRooKey) ->
 lookup(Type, Name) ->
     lookup(?ROOT_KEY_ACTIONS, Type, Name).
 
--spec lookup(sources | actions, bridge_v2_type(), bridge_v2_name()) ->
+-spec lookup(root_cfg_key(), bridge_v2_type(), bridge_v2_name()) ->
     {ok, bridge_v2_info()} | {error, not_found}.
 lookup(ConfRootName, Type, Name) ->
     case emqx:get_raw_config([ConfRootName, Type, Name], not_found) of
@@ -315,6 +323,11 @@ remove(ConfRootKey, BridgeType, BridgeName) ->
 
 -spec check_deps_and_remove(bridge_v2_type(), bridge_v2_name(), boolean()) -> ok | {error, any()}.
 check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) ->
+    check_deps_and_remove(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, AlsoDeleteActions).
+
+-spec check_deps_and_remove(root_cfg_key(), bridge_v2_type(), bridge_v2_name(), boolean()) ->
+    ok | {error, any()}.
+check_deps_and_remove(ConfRooKey, BridgeType, BridgeName, AlsoDeleteActions) ->
     AlsoDelete =
         case AlsoDeleteActions of
             true -> [rule_actions];
@@ -328,7 +341,7 @@ check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) ->
         )
     of
         ok ->
-            remove(BridgeType, BridgeName);
+            remove(ConfRooKey, BridgeType, BridgeName);
         {error, Reason} ->
             {error, Reason}
     end.
@@ -539,11 +552,14 @@ disable_enable(ConfRootKey, Action, BridgeType, BridgeName) when ?ENABLE_OR_DISA
 %% is something else than connected after starting the connector or if an
 %% error occurred when the connector was started.
 -spec start(term(), term()) -> ok | {error, Reason :: term()}.
-start(BridgeV2Type, Name) ->
+start(ActionOrSourceType, Name) ->
+    start(?ROOT_KEY_ACTIONS, ActionOrSourceType, Name).
+
+-spec start(root_cfg_key(), term(), term()) -> ok | {error, Reason :: term()}.
+start(ConfRootKey, BridgeV2Type, Name) ->
     ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
         emqx_connector_resource:start(ConnectorType, ConnectorName)
     end,
-    ConfRootKey = ?ROOT_KEY_ACTIONS,
     connector_operation_helper(ConfRootKey, BridgeV2Type, Name, ConnectorOpFun, true).
 
 connector_operation_helper(ConfRootKey, BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) ->
@@ -694,10 +710,15 @@ health_check(ConfRootKey, BridgeType, BridgeName) ->
     end.
 
 -spec create_dry_run(bridge_v2_type(), Config :: map()) -> ok | {error, term()}.
-create_dry_run(Type, Conf0) ->
+create_dry_run(Type, Conf) ->
+    create_dry_run(?ROOT_KEY_ACTIONS, Type, Conf).
+
+-spec create_dry_run(root_cfg_key(), bridge_v2_type(), Config :: map()) -> ok | {error, term()}.
+create_dry_run(ConfRootKey, Type, Conf0) ->
     Conf1 = maps:without([<<"name">>], Conf0),
     TypeBin = bin(Type),
-    RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
+    ConfRootKeyBin = bin(ConfRootKey),
+    RawConf = #{ConfRootKeyBin => #{TypeBin => #{<<"temp_name">> => Conf1}}},
     %% Check config
     try
         _ =
@@ -722,6 +743,9 @@ create_dry_run(Type, Conf0) ->
     end.
 
 create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
+    create_dry_run_helper(?ROOT_KEY_ACTIONS, BridgeType, ConnectorRawConf, BridgeV2RawConf).
+
+create_dry_run_helper(ConfRootKey, BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
     BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
     ConnectorType = connector_type(BridgeType),
     OnReadyCallback =
@@ -730,7 +754,7 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
             ChannelTestId = id(BridgeType, BridgeName, ConnectorName),
             Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf),
             AugmentedConf = augment_channel_config(
-                ?ROOT_KEY_ACTIONS,
+                ConfRootKey,
                 BridgeType,
                 BridgeName,
                 Conf
@@ -756,6 +780,8 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
 get_metrics(Type, Name) ->
     get_metrics(?ROOT_KEY_ACTIONS, Type, Name).
 
+-spec get_metrics(root_cfg_key(), bridge_v2_type(), bridge_v2_name()) ->
+    emqx_metrics_worker:metrics().
 get_metrics(ConfRootKey, Type, Name) ->
     emqx_resource:get_metrics(id_with_root_name(ConfRootKey, Type, Name)).
 

+ 145 - 71
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -26,6 +26,9 @@
 -import(hoconsc, [mk/2, array/1, enum/1]).
 -import(emqx_utils, [redact/1]).
 
+-define(ROOT_KEY_ACTIONS, actions).
+-define(ROOT_KEY_SOURCES, sources).
+
 %% Swagger specs from hocon schema
 -export([
     api_spec/0,
@@ -48,7 +51,14 @@
 ]).
 
 %% BpAPI / RPC Targets
--export([lookup_from_local_node/2, get_metrics_from_local_node/2]).
+-export([
+    lookup_from_local_node/2,
+    get_metrics_from_local_node/2,
+    lookup_from_local_node_v6/3,
+    get_metrics_from_local_node_v6/3
+]).
+
+-define(BPAPI_NAME, emqx_bridge).
 
 -define(BRIDGE_NOT_FOUND(BRIDGE_TYPE, BRIDGE_NAME),
     ?NOT_FOUND(
@@ -393,16 +403,51 @@ schema("/action_types") ->
     }.
 
 '/actions'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
-    case emqx_bridge_v2:lookup(BridgeType, BridgeName) of
-        {ok, _} ->
-            ?BAD_REQUEST('ALREADY_EXISTS', <<"bridge already exists">>);
-        {error, not_found} ->
-            Conf = filter_out_request_body(Conf0),
-            create_bridge(BridgeType, BridgeName, Conf)
-    end;
+    handle_create(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, Conf0);
 '/actions'(get, _Params) ->
-    Nodes = mria:running_nodes(),
-    NodeReplies = emqx_bridge_proto_v5:v2_list_bridges_on_nodes(Nodes),
+    handle_list(?ROOT_KEY_ACTIONS).
+
+'/actions/:id'(get, #{bindings := #{id := Id}}) ->
+    ?TRY_PARSE_ID(Id, lookup_from_all_nodes(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, 200));
+'/actions/:id'(put, #{bindings := #{id := Id}, body := Conf0}) ->
+    handle_update(?ROOT_KEY_ACTIONS, Id, Conf0);
+'/actions/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) ->
+    handle_delete(?ROOT_KEY_ACTIONS, Id, Qs).
+
+'/actions/:id/metrics'(get, #{bindings := #{id := Id}}) ->
+    ?TRY_PARSE_ID(Id, get_metrics_from_all_nodes(?ROOT_KEY_ACTIONS, BridgeType, BridgeName)).
+
+'/actions/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
+    handle_reset_metrics(?ROOT_KEY_ACTIONS, Id).
+
+'/actions/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
+    handle_disable_enable(?ROOT_KEY_ACTIONS, Id, Enable).
+
+'/actions/:id/:operation'(post, #{
+    bindings :=
+        #{id := Id, operation := Op}
+}) ->
+    handle_operation(?ROOT_KEY_ACTIONS, Id, Op).
+
+'/nodes/:node/actions/:id/:operation'(post, #{
+    bindings :=
+        #{id := Id, operation := Op, node := Node}
+}) ->
+    handle_node_operation(?ROOT_KEY_ACTIONS, Node, Id, Op).
+
+'/actions_probe'(post, Request) ->
+    handle_probe(?ROOT_KEY_ACTIONS, Request).
+
+'/action_types'(get, _Request) ->
+    ?OK(emqx_bridge_v2_schema:types()).
+
+%%------------------------------------------------------------------------------
+%% Handlers
+%%------------------------------------------------------------------------------
+
+handle_list(ConfRootKey) ->
+    Nodes = emqx:running_nodes(),
+    NodeReplies = emqx_bridge_proto_v6:v2_list_bridges_on_nodes_v6(Nodes, ConfRootKey),
     case is_ok(NodeReplies) of
         {ok, NodeBridges} ->
             AllBridges = [
@@ -414,34 +459,44 @@ schema("/action_types") ->
             ?INTERNAL_ERROR(Reason)
     end.
 
-'/actions/:id'(get, #{bindings := #{id := Id}}) ->
-    ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
-'/actions/:id'(put, #{bindings := #{id := Id}, body := Conf0}) ->
+handle_create(ConfRootKey, Type, Name, Conf0) ->
+    case emqx_bridge_v2:lookup(ConfRootKey, Type, Name) of
+        {ok, _} ->
+            ?BAD_REQUEST('ALREADY_EXISTS', <<"bridge already exists">>);
+        {error, not_found} ->
+            Conf = filter_out_request_body(Conf0),
+            create_bridge(ConfRootKey, Type, Name, Conf)
+    end.
+
+handle_update(ConfRootKey, Id, Conf0) ->
     Conf1 = filter_out_request_body(Conf0),
     ?TRY_PARSE_ID(
         Id,
-        case emqx_bridge_v2:lookup(BridgeType, BridgeName) of
+        case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of
             {ok, _} ->
                 RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}),
                 Conf = emqx_utils:deobfuscate(Conf1, RawConf),
-                update_bridge(BridgeType, BridgeName, Conf);
+                update_bridge(ConfRootKey, BridgeType, BridgeName, Conf);
             {error, not_found} ->
                 ?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
         end
-    );
-'/actions/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) ->
+    ).
+
+handle_delete(ConfRootKey, Id, QueryStringOpts) ->
     ?TRY_PARSE_ID(
         Id,
         case emqx_bridge_v2:lookup(BridgeType, BridgeName) of
             {ok, _} ->
                 AlsoDeleteActions =
-                    case maps:get(<<"also_delete_dep_actions">>, Qs, <<"false">>) of
+                    case maps:get(<<"also_delete_dep_actions">>, QueryStringOpts, <<"false">>) of
                         <<"true">> -> true;
                         true -> true;
                         _ -> false
                     end,
                 case
-                    emqx_bridge_v2:check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions)
+                    emqx_bridge_v2:check_deps_and_remove(
+                        ConfRootKey, BridgeType, BridgeName, AlsoDeleteActions
+                    )
                 of
                     ok ->
                         ?NO_CONTENT;
@@ -465,23 +520,22 @@ schema("/action_types") ->
         end
     ).
 
-'/actions/:id/metrics'(get, #{bindings := #{id := Id}}) ->
-    ?TRY_PARSE_ID(Id, get_metrics_from_all_nodes(BridgeType, BridgeName)).
-
-'/actions/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
+handle_reset_metrics(ConfRootKey, Id) ->
     ?TRY_PARSE_ID(
         Id,
         begin
             ActionType = emqx_bridge_v2:bridge_v2_type_to_connector_type(BridgeType),
-            ok = emqx_bridge_v2:reset_metrics(ActionType, BridgeName),
+            ok = emqx_bridge_v2:reset_metrics(ConfRootKey, ActionType, BridgeName),
             ?NO_CONTENT
         end
     ).
 
-'/actions/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
+handle_disable_enable(ConfRootKey, Id, Enable) ->
     ?TRY_PARSE_ID(
         Id,
-        case emqx_bridge_v2:disable_enable(enable_func(Enable), BridgeType, BridgeName) of
+        case
+            emqx_bridge_v2:disable_enable(ConfRootKey, enable_func(Enable), BridgeType, BridgeName)
+        of
             {ok, _} ->
                 ?NO_CONTENT;
             {error, {pre_config_update, _, bridge_not_found}} ->
@@ -495,41 +549,37 @@ schema("/action_types") ->
         end
     ).
 
-'/actions/:id/:operation'(post, #{
-    bindings :=
-        #{id := Id, operation := Op}
-}) ->
+handle_operation(ConfRootKey, Id, Op) ->
     ?TRY_PARSE_ID(
         Id,
         begin
             OperFunc = operation_func(all, Op),
-            Nodes = mria:running_nodes(),
-            call_operation_if_enabled(all, OperFunc, [Nodes, BridgeType, BridgeName])
+            Nodes = emqx:running_nodes(),
+            call_operation_if_enabled(all, OperFunc, [Nodes, ConfRootKey, BridgeType, BridgeName])
         end
     ).
 
-'/nodes/:node/actions/:id/:operation'(post, #{
-    bindings :=
-        #{id := Id, operation := Op, node := Node}
-}) ->
+handle_node_operation(ConfRootKey, Node, Id, Op) ->
     ?TRY_PARSE_ID(
         Id,
         case emqx_utils:safe_to_existing_atom(Node, utf8) of
             {ok, TargetNode} ->
                 OperFunc = operation_func(TargetNode, Op),
-                call_operation_if_enabled(TargetNode, OperFunc, [TargetNode, BridgeType, BridgeName]);
+                call_operation_if_enabled(TargetNode, OperFunc, [
+                    TargetNode, ConfRootKey, BridgeType, BridgeName
+                ]);
             {error, _} ->
                 ?NOT_FOUND(<<"Invalid node name: ", Node/binary>>)
         end
     ).
 
-'/actions_probe'(post, Request) ->
+handle_probe(ConfRootKey, Request) ->
     RequestMeta = #{module => ?MODULE, method => post, path => "/actions_probe"},
     case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of
-        {ok, #{body := #{<<"type">> := ConnType} = Params}} ->
+        {ok, #{body := #{<<"type">> := Type} = Params}} ->
             Params1 = maybe_deobfuscate_bridge_probe(Params),
             Params2 = maps:remove(<<"type">>, Params1),
-            case emqx_bridge_v2:create_dry_run(ConnType, Params2) of
+            case emqx_bridge_v2:create_dry_run(ConfRootKey, Type, Params2) of
                 ok ->
                     ?NO_CONTENT;
                 {error, #{kind := validation_error} = Reason0} ->
@@ -548,9 +598,7 @@ schema("/action_types") ->
             redact(BadRequest)
     end.
 
-'/action_types'(get, _Request) ->
-    ?OK(emqx_bridge_v2_schema:types()).
-
+%%% API helpers
 maybe_deobfuscate_bridge_probe(#{<<"type">> := ActionType, <<"name">> := BridgeName} = Params) ->
     case emqx_bridge_v2:lookup(ActionType, BridgeName) of
         {ok, #{raw_config := RawConf}} ->
@@ -564,7 +612,6 @@ maybe_deobfuscate_bridge_probe(#{<<"type">> := ActionType, <<"name">> := BridgeN
 maybe_deobfuscate_bridge_probe(Params) ->
     Params.
 
-%%% API helpers
 is_ok(ok) ->
     ok;
 is_ok(OkResult = {ok, _}) ->
@@ -587,9 +634,16 @@ is_ok(ResL) ->
     end.
 
 %% bridge helpers
-lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
-    Nodes = mria:running_nodes(),
-    case is_ok(emqx_bridge_proto_v5:v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
+-spec lookup_from_all_nodes(emqx_bridge_v2:root_cfg_key(), _, _, _) -> _.
+lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, SuccCode) ->
+    Nodes = emqx:running_nodes(),
+    case
+        is_ok(
+            emqx_bridge_proto_v6:v2_lookup_from_all_nodes_v6(
+                Nodes, ConfRootKey, BridgeType, BridgeName
+            )
+        )
+    of
         {ok, [{ok, _} | _] = Results} ->
             {SuccCode, format_bridge_info([R || {ok, R} <- Results])};
         {ok, [{error, not_found} | _]} ->
@@ -598,10 +652,10 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
             ?INTERNAL_ERROR(Reason)
     end.
 
-get_metrics_from_all_nodes(ActionType, ActionName) ->
+get_metrics_from_all_nodes(ConfRootKey, Type, Name) ->
     Nodes = emqx:running_nodes(),
     Result = maybe_unwrap(
-        emqx_bridge_proto_v5:v2_get_metrics_from_all_nodes(Nodes, ActionType, ActionName)
+        emqx_bridge_proto_v6:v2_get_metrics_from_all_nodes_v6(Nodes, ConfRootKey, Type, Name)
     ),
     case Result of
         Metrics when is_list(Metrics) ->
@@ -610,22 +664,25 @@ get_metrics_from_all_nodes(ActionType, ActionName) ->
             ?INTERNAL_ERROR(Reason)
     end.
 
-operation_func(all, start) -> v2_start_bridge_to_all_nodes;
-operation_func(_Node, start) -> v2_start_bridge_to_node.
+operation_func(all, start) -> v2_start_bridge_to_all_nodes_v6;
+operation_func(_Node, start) -> v2_start_bridge_to_node_v6;
+operation_func(all, lookup) -> v2_lookup_from_all_nodes_v6;
+operation_func(all, list) -> v2_list_bridges_on_nodes_v6;
+operation_func(all, get_metrics) -> v2_get_metrics_from_all_nodes_v6.
 
-call_operation_if_enabled(NodeOrAll, OperFunc, [Nodes, BridgeType, BridgeName]) ->
-    try is_enabled_bridge(BridgeType, BridgeName) of
+call_operation_if_enabled(NodeOrAll, OperFunc, [Nodes, ConfRootKey, BridgeType, BridgeName]) ->
+    try is_enabled_bridge(ConfRootKey, BridgeType, BridgeName) of
         false ->
             ?BRIDGE_NOT_ENABLED;
         true ->
-            call_operation(NodeOrAll, OperFunc, [Nodes, BridgeType, BridgeName])
+            call_operation(NodeOrAll, OperFunc, [Nodes, ConfRootKey, BridgeType, BridgeName])
     catch
         throw:not_found ->
             ?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
     end.
 
-is_enabled_bridge(BridgeType, BridgeName) ->
-    try emqx_bridge_v2:lookup(BridgeType, binary_to_existing_atom(BridgeName)) of
+is_enabled_bridge(ConfRootKey, BridgeType, BridgeName) ->
+    try emqx_bridge_v2:lookup(ConfRootKey, BridgeType, binary_to_existing_atom(BridgeName)) of
         {ok, #{raw_config := ConfMap}} ->
             maps:get(<<"enable">>, ConfMap, false);
         {error, not_found} ->
@@ -637,7 +694,7 @@ is_enabled_bridge(BridgeType, BridgeName) ->
             throw(not_found)
     end.
 
-call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
+call_operation(NodeOrAll, OperFunc, Args = [_Nodes, _ConfRootKey, BridgeType, BridgeName]) ->
     case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
         Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok ->
             ?NO_CONTENT;
@@ -668,12 +725,12 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
 
 do_bpapi_call(all, Call, Args) ->
     maybe_unwrap(
-        do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args)
+        do_bpapi_call_vsn(emqx_bpapi:supported_version(?BPAPI_NAME), Call, Args)
     );
 do_bpapi_call(Node, Call, Args) ->
-    case lists:member(Node, mria:running_nodes()) of
+    case lists:member(Node, emqx:running_nodes()) of
         true ->
-            do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args);
+            do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, ?BPAPI_NAME), Call, Args);
         false ->
             {error, {node_not_found, Node}}
     end.
@@ -681,7 +738,7 @@ do_bpapi_call(Node, Call, Args) ->
 do_bpapi_call_vsn(Version, Call, Args) ->
     case is_supported_version(Version, Call) of
         true ->
-            apply(emqx_bridge_proto_v5, Call, Args);
+            apply(emqx_bridge_proto_v6, Call, Args);
         false ->
             {error, not_implemented}
     end.
@@ -689,7 +746,12 @@ do_bpapi_call_vsn(Version, Call, Args) ->
 is_supported_version(Version, Call) ->
     lists:member(Version, supported_versions(Call)).
 
-supported_versions(_Call) -> [5].
+supported_versions(_Call) -> bpapi_version_range(6, latest).
+
+%% [From, To] (inclusive on both ends)
+bpapi_version_range(From, latest) ->
+    ThisNodeVsn = emqx_bpapi:supported_version(node(), ?BPAPI_NAME),
+    lists:seq(From, ThisNodeVsn).
 
 maybe_unwrap({error, not_implemented}) ->
     {error, not_implemented};
@@ -767,10 +829,22 @@ lookup_from_local_node(BridgeType, BridgeName) ->
         Error -> Error
     end.
 
+%% RPC Target
+-spec lookup_from_local_node_v6(emqx_bridge_v2:root_cfg_key(), _, _) -> _.
+lookup_from_local_node_v6(ConfRootKey, BridgeType, BridgeName) ->
+    case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of
+        {ok, Res} -> {ok, format_resource(Res, node())};
+        Error -> Error
+    end.
+
 %% RPC Target
 get_metrics_from_local_node(ActionType, ActionName) ->
     format_metrics(emqx_bridge_v2:get_metrics(ActionType, ActionName)).
 
+%% RPC Target
+get_metrics_from_local_node_v6(ConfRootKey, Type, Name) ->
+    format_metrics(emqx_bridge_v2:get_metrics(ConfRootKey, Type, Name)).
+
 %% resource
 format_resource(
     #{
@@ -938,13 +1012,13 @@ format_resource_data(error, Error, Result) ->
 format_resource_data(K, V, Result) ->
     Result#{K => V}.
 
-create_bridge(BridgeType, BridgeName, Conf) ->
-    create_or_update_bridge(BridgeType, BridgeName, Conf, 201).
+create_bridge(ConfRootKey, BridgeType, BridgeName, Conf) ->
+    create_or_update_bridge(ConfRootKey, BridgeType, BridgeName, Conf, 201).
 
-update_bridge(BridgeType, BridgeName, Conf) ->
-    create_or_update_bridge(BridgeType, BridgeName, Conf, 200).
+update_bridge(ConfRootKey, BridgeType, BridgeName, Conf) ->
+    create_or_update_bridge(ConfRootKey, BridgeType, BridgeName, Conf, 200).
 
-create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
+create_or_update_bridge(ConfRootKey, BridgeType, BridgeName, Conf, HttpStatusCode) ->
     Check =
         try
             is_binary(BridgeType) andalso emqx_resource:validate_type(BridgeType),
@@ -955,15 +1029,15 @@ create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
         end,
     case Check of
         ok ->
-            do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode);
+            do_create_or_update_bridge(ConfRootKey, BridgeType, BridgeName, Conf, HttpStatusCode);
         BadRequest ->
             BadRequest
     end.
 
-do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
-    case emqx_bridge_v2:create(BridgeType, BridgeName, Conf) of
+do_create_or_update_bridge(ConfRootKey, BridgeType, BridgeName, Conf, HttpStatusCode) ->
+    case emqx_bridge_v2:create(ConfRootKey, BridgeType, BridgeName, Conf) of
         {ok, _} ->
-            lookup_from_all_nodes(BridgeType, BridgeName, HttpStatusCode);
+            lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, HttpStatusCode);
         {error, {PreOrPostConfigUpdate, _HandlerMod, Reason}} when
             PreOrPostConfigUpdate =:= pre_config_update;
             PreOrPostConfigUpdate =:= post_config_update

+ 196 - 0
apps/emqx_bridge/src/proto/emqx_bridge_proto_v6.erl

@@ -0,0 +1,196 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_proto_v6).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    list_bridges_on_nodes/1,
+    restart_bridge_to_node/3,
+    start_bridge_to_node/3,
+    stop_bridge_to_node/3,
+    lookup_from_all_nodes/3,
+    get_metrics_from_all_nodes/3,
+    restart_bridges_to_all_nodes/3,
+    start_bridges_to_all_nodes/3,
+    stop_bridges_to_all_nodes/3,
+
+    %% introduced in v6
+    v2_lookup_from_all_nodes_v6/4,
+    v2_list_bridges_on_nodes_v6/2,
+    v2_get_metrics_from_all_nodes_v6/4,
+    v2_start_bridge_to_node_v6/4,
+    v2_start_bridge_to_all_nodes_v6/4
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-define(TIMEOUT, 15000).
+
+introduced_in() ->
+    "5.5.0".
+
+-spec list_bridges_on_nodes([node()]) ->
+    emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
+list_bridges_on_nodes(Nodes) ->
+    erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT).
+
+-type key() :: atom() | binary() | [byte()].
+
+-spec restart_bridge_to_node(node(), key(), key()) ->
+    term().
+restart_bridge_to_node(Node, BridgeType, BridgeName) ->
+    rpc:call(
+        Node,
+        emqx_bridge_resource,
+        restart,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec start_bridge_to_node(node(), key(), key()) ->
+    term().
+start_bridge_to_node(Node, BridgeType, BridgeName) ->
+    rpc:call(
+        Node,
+        emqx_bridge_resource,
+        start,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec stop_bridge_to_node(node(), key(), key()) ->
+    term().
+stop_bridge_to_node(Node, BridgeType, BridgeName) ->
+    rpc:call(
+        Node,
+        emqx_bridge_resource,
+        stop,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall(ok).
+restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_resource,
+        restart,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec start_bridges_to_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall(ok).
+start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_resource,
+        start,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall(ok).
+stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_resource,
+        stop,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec lookup_from_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall(term()).
+lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_api,
+        lookup_from_local_node,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec get_metrics_from_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()).
+get_metrics_from_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_api,
+        get_metrics_from_local_node,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+%%--------------------------------------------------------------------------------
+%% introduced in v6
+%%--------------------------------------------------------------------------------
+
+%% V2 Calls
+-spec v2_lookup_from_all_nodes_v6([node()], emqx_bridge_v2:root_cfg_key(), key(), key()) ->
+    emqx_rpc:erpc_multicall(term()).
+v2_lookup_from_all_nodes_v6(Nodes, ConfRootKey, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_v2_api,
+        lookup_from_local_node_v6,
+        [ConfRootKey, BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec v2_list_bridges_on_nodes_v6([node()], emqx_bridge_v2:root_cfg_key()) ->
+    emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
+v2_list_bridges_on_nodes_v6(Nodes, ConfRootKey) ->
+    erpc:multicall(Nodes, emqx_bridge_v2, list, [ConfRootKey], ?TIMEOUT).
+
+-spec v2_get_metrics_from_all_nodes_v6([node()], emqx_bridge_v2:root_cfg_key(), key(), key()) ->
+    emqx_rpc:erpc_multicall(term()).
+v2_get_metrics_from_all_nodes_v6(Nodes, ConfRootKey, ActionType, ActionName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_v2_api,
+        get_metrics_from_local_node_v6,
+        [ConfRootKey, ActionType, ActionName],
+        ?TIMEOUT
+    ).
+
+-spec v2_start_bridge_to_all_nodes_v6([node()], emqx_bridge_v2:root_cfg_key(), key(), key()) ->
+    emqx_rpc:erpc_multicall(ok).
+v2_start_bridge_to_all_nodes_v6(Nodes, ConfRootKey, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_v2,
+        start,
+        [ConfRootKey, BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec v2_start_bridge_to_node_v6(node(), emqx_bridge_v2:root_cfg_key(), key(), key()) ->
+    term().
+v2_start_bridge_to_node_v6(Node, ConfRootKey, BridgeType, BridgeName) ->
+    rpc:call(
+        Node,
+        emqx_bridge_v2,
+        start,
+        [ConfRootKey, BridgeType, BridgeName],
+        ?TIMEOUT
+    ).

+ 4 - 3
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -160,8 +160,9 @@ end_per_group(_, Config) ->
 
 init_per_testcase(t_broken_bpapi_vsn, Config) ->
     meck:new(emqx_bpapi, [passthrough]),
-    meck:expect(emqx_bpapi, supported_version, 1, -1),
     meck:expect(emqx_bpapi, supported_version, 2, -1),
+    meck:new(emqx_bridge_api, [passthrough]),
+    meck:expect(emqx_bridge_api, supported_versions, 1, []),
     init_per_testcase(common, Config);
 init_per_testcase(t_old_bpapi_vsn, Config) ->
     meck:new(emqx_bpapi, [passthrough]),
@@ -173,10 +174,10 @@ init_per_testcase(_, Config) ->
     [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config].
 
 end_per_testcase(t_broken_bpapi_vsn, Config) ->
-    meck:unload([emqx_bpapi]),
+    meck:unload(),
     end_per_testcase(common, Config);
 end_per_testcase(t_old_bpapi_vsn, Config) ->
-    meck:unload([emqx_bpapi]),
+    meck:unload(),
     end_per_testcase(common, Config);
 end_per_testcase(_, Config) ->
     Sock = ?config(sock, Config),

+ 3 - 13
apps/emqx_bridge/test/emqx_bridge_testlib.erl

@@ -196,20 +196,10 @@ delete_bridge_http_api_v1(Opts) ->
 op_bridge_api(Op, BridgeType, BridgeName) ->
     BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
     Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]),
-    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
-    Opts = #{return_all => true},
     ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
-    Res =
-        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of
-            {ok, {Status = {_, 204, _}, Headers, Body}} ->
-                {ok, {Status, Headers, Body}};
-            {ok, {Status, Headers, Body}} ->
-                {ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
-            {error, {Status, Headers, Body}} ->
-                {error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
-            Error ->
-                Error
-        end,
+    Method = post,
+    Params = [],
+    Res = emqx_bridge_v2_testlib:request(Method, Path, Params),
     ct:pal("bridge op result: ~p", [Res]),
     Res.
 

+ 4 - 14
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -308,21 +308,11 @@ update_bridge_api(Config, Overrides) ->
 op_bridge_api(Op, BridgeType, BridgeName) ->
     BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
     Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId, Op]),
-    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
-    Opts = #{return_all => true},
     ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
-    Res =
-        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of
-            {ok, {Status = {_, 204, _}, Headers, Body}} ->
-                {ok, {Status, Headers, Body}};
-            {ok, {Status, Headers, Body}} ->
-                {ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
-            {error, {Status, Headers, Body}} ->
-                {error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
-            Error ->
-                Error
-        end,
-    ct:pal("bridge op result: ~p", [Res]),
+    Method = post,
+    Params = [],
+    Res = request(Method, Path, Params),
+    ct:pal("bridge op result:\n  ~p", [Res]),
     Res.
 
 probe_bridge_api(Config) ->