Explorar o código

feat: add `/sources*` HTTP APIs

Thales Macedo Garitezi %!s(int64=2) %!d(string=hai) anos
pai
achega
28de7c89c7

+ 15 - 5
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -23,6 +23,7 @@
     bridge_to_resource_type/1,
     resource_id/1,
     resource_id/2,
+    resource_id/3,
     bridge_id/2,
     parse_bridge_id/1,
     parse_bridge_id/2,
@@ -62,6 +63,9 @@
         ?IS_BI_DIR_BRIDGE(TYPE)
 ).
 
+-define(ROOT_KEY_ACTIONS, actions).
+-define(ROOT_KEY_SOURCES, sources).
+
 -if(?EMQX_RELEASE_EDITION == ee).
 bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
     bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
@@ -85,11 +89,21 @@ bridge_impl_module(_BridgeType) -> undefined.
 -endif.
 
 resource_id(BridgeId) when is_binary(BridgeId) ->
+    resource_id_for_kind(?ROOT_KEY_ACTIONS, BridgeId).
+
+resource_id(BridgeType, BridgeName) ->
+    resource_id(?ROOT_KEY_ACTIONS, BridgeType, BridgeName).
+
+resource_id(ConfRootKey, BridgeType, BridgeName) ->
+    BridgeId = bridge_id(BridgeType, BridgeName),
+    resource_id_for_kind(ConfRootKey, BridgeId).
+
+resource_id_for_kind(ConfRootKey, BridgeId) when is_binary(BridgeId) ->
     case binary:split(BridgeId, <<":">>) of
         [Type, _Name] ->
             case emqx_bridge_v2:is_bridge_v2_type(Type) of
                 true ->
-                    emqx_bridge_v2:bridge_v1_id_to_connector_resource_id(BridgeId);
+                    emqx_bridge_v2:bridge_v1_id_to_connector_resource_id(ConfRootKey, BridgeId);
                 false ->
                     <<"bridge:", BridgeId/binary>>
             end;
@@ -97,10 +111,6 @@ resource_id(BridgeId) when is_binary(BridgeId) ->
             invalid_data(<<"should be of pattern {type}:{name}, but got ", BridgeId/binary>>)
     end.
 
-resource_id(BridgeType, BridgeName) ->
-    BridgeId = bridge_id(BridgeType, BridgeName),
-    resource_id(BridgeId).
-
 bridge_id(BridgeType, BridgeName) ->
     Name = bin(BridgeName),
     Type = bin(BridgeType),

+ 17 - 6
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -91,6 +91,7 @@
     id/2,
     id/3,
     bridge_v1_is_valid/2,
+    bridge_v1_is_valid/3,
     extract_connector_id_from_bridge_v2_id/1
 ]).
 
@@ -128,6 +129,7 @@
     %% Exception from the naming convention:
     bridge_v2_type_to_bridge_v1_type/2,
     bridge_v1_id_to_connector_resource_id/1,
+    bridge_v1_id_to_connector_resource_id/2,
     bridge_v1_enable_disable/3,
     bridge_v1_restart/2,
     bridge_v1_stop/2,
@@ -567,7 +569,7 @@ connector_operation_helper(ConfRootKey, BridgeV2Type, Name, ConnectorOpFun, DoHe
         ConfRootKey,
         BridgeV2Type,
         Name,
-        lookup_conf(BridgeV2Type, Name),
+        lookup_conf(ConfRootKey, BridgeV2Type, Name),
         ConnectorOpFun,
         DoHealthCheck
     ).
@@ -1191,8 +1193,11 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) ->
 %% * The corresponding bridge v2 should exist
 %% * The connector for the bridge v2 should have exactly one channel
 bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
+    bridge_v1_is_valid(?ROOT_KEY_ACTIONS, BridgeV1Type, BridgeName).
+
+bridge_v1_is_valid(ConfRootKey, BridgeV1Type, BridgeName) ->
     BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
-    case lookup_conf(BridgeV2Type, BridgeName) of
+    case lookup_conf(ConfRootKey, BridgeV2Type, BridgeName) of
         {error, _} ->
             %% If the bridge v2 does not exist, it is a valid bridge v1
             true;
@@ -1241,17 +1246,20 @@ bridge_v1_list_and_transform() ->
 
 bridge_v1_lookup_and_transform(ActionType, Name) ->
     case lookup_actions_or_sources(ActionType, Name) of
-        {ok, ConfRootName,
+        {ok, ConfRootKey,
             #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} ->
             BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig),
             HasBridgeV1Equivalent = has_bridge_v1_equivalent(ActionType),
-            case HasBridgeV1Equivalent andalso ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
+            case
+                HasBridgeV1Equivalent andalso
+                    ?MODULE:bridge_v1_is_valid(ConfRootKey, BridgeV1Type, Name)
+            of
                 true ->
                     ConnectorType = connector_type(ActionType),
                     case emqx_connector:lookup(ConnectorType, ConnectorName) of
                         {ok, Connector} ->
                             bridge_v1_lookup_and_transform_helper(
-                                ConfRootName,
+                                ConfRootKey,
                                 BridgeV1Type,
                                 Name,
                                 ActionType,
@@ -1718,11 +1726,14 @@ connector_has_channels(BridgeV2Type, ConnectorName) ->
     end.
 
 bridge_v1_id_to_connector_resource_id(BridgeId) ->
+    bridge_v1_id_to_connector_resource_id(?ROOT_KEY_ACTIONS, BridgeId).
+
+bridge_v1_id_to_connector_resource_id(ConfRootKey, BridgeId) ->
     case binary:split(BridgeId, <<":">>) of
         [Type, Name] ->
             BridgeV2Type = bin(bridge_v1_type_to_bridge_v2_type(Type)),
             ConnectorName =
-                case lookup_conf(BridgeV2Type, Name) of
+                case lookup_conf(ConfRootKey, BridgeV2Type, Name) of
                     #{connector := Con} ->
                         Con;
                     {error, Reason} ->

+ 346 - 38
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -37,7 +37,7 @@
     namespace/0
 ]).
 
-%% API callbacks
+%% API callbacks : actions
 -export([
     '/actions'/2,
     '/actions/:id'/2,
@@ -49,6 +49,18 @@
     '/actions_probe'/2,
     '/action_types'/2
 ]).
+%% API callbacks : sources
+-export([
+    '/sources'/2,
+    '/sources/:id'/2,
+    '/sources/:id/metrics'/2,
+    '/sources/:id/metrics/reset'/2,
+    '/sources/:id/enable/:enable'/2,
+    '/sources/:id/:operation'/2,
+    '/nodes/:node/sources/:id/:operation'/2,
+    '/sources_probe'/2,
+    '/source_types'/2
+]).
 
 %% BpAPI / RPC Targets
 -export([
@@ -81,13 +93,16 @@
     end
 ).
 
-namespace() -> "actions".
+namespace() -> "actions_and_sources".
 
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
 
 paths() ->
     [
+        %%=============
+        %% Actions
+        %%=============
         "/actions",
         "/actions/:id",
         "/actions/:id/enable/:enable",
@@ -98,7 +113,21 @@ paths() ->
         "/actions/:id/metrics",
         "/actions/:id/metrics/reset",
         "/actions_probe",
-        "/action_types"
+        "/action_types",
+        %%=============
+        %% Sources
+        %%=============
+        "/sources",
+        "/sources/:id",
+        "/sources/:id/enable/:enable",
+        "/sources/:id/:operation",
+        "/nodes/:node/sources/:id/:operation",
+        %% %% Caveat: metrics paths must come *after* `/:operation', otherwise minirest will
+        %% %% try to match the latter first, trying to interpret `metrics' as an operation...
+        "/sources/:id/metrics",
+        "/sources/:id/metrics/reset",
+        "/sources_probe"
+        %% "/source_types"
     ].
 
 error_schema(Code, Message) ->
@@ -111,17 +140,28 @@ error_schema(Codes, Message, ExtraFields) when is_list(Message) ->
 error_schema(Codes, Message, ExtraFields) when is_list(Codes) andalso is_binary(Message) ->
     ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message).
 
-get_response_body_schema() ->
+actions_get_response_body_schema() ->
+    emqx_dashboard_swagger:schema_with_examples(
+        emqx_bridge_v2_schema:actions_get_response(),
+        bridge_info_examples(get, ?ROOT_KEY_ACTIONS)
+    ).
+
+sources_get_response_body_schema() ->
     emqx_dashboard_swagger:schema_with_examples(
-        emqx_bridge_v2_schema:get_response(),
-        bridge_info_examples(get)
+        emqx_bridge_v2_schema:sources_get_response(),
+        bridge_info_examples(get, ?ROOT_KEY_SOURCES)
     ).
 
-bridge_info_examples(Method) ->
-    emqx_bridge_v2_schema:examples(Method).
+bridge_info_examples(Method, ?ROOT_KEY_ACTIONS) ->
+    emqx_bridge_v2_schema:actions_examples(Method);
+bridge_info_examples(Method, ?ROOT_KEY_SOURCES) ->
+    emqx_bridge_v2_schema:sources_examples(Method).
 
-bridge_info_array_example(Method) ->
-    lists:map(fun(#{value := Config}) -> Config end, maps:values(bridge_info_examples(Method))).
+bridge_info_array_example(Method, ConfRootKey) ->
+    lists:map(
+        fun(#{value := Config}) -> Config end,
+        maps:values(bridge_info_examples(Method, ConfRootKey))
+    ).
 
 param_path_id() ->
     {id,
@@ -195,6 +235,9 @@ param_path_enable() ->
             }
         )}.
 
+%%================================================================================
+%% Actions
+%%================================================================================
 schema("/actions") ->
     #{
         'operationId' => '/actions',
@@ -204,8 +247,8 @@ schema("/actions") ->
             description => ?DESC("desc_api1"),
             responses => #{
                 200 => emqx_dashboard_swagger:schema_with_example(
-                    array(emqx_bridge_v2_schema:get_response()),
-                    bridge_info_array_example(get)
+                    array(emqx_bridge_v2_schema:actions_get_response()),
+                    bridge_info_array_example(get, ?ROOT_KEY_ACTIONS)
                 )
             }
         },
@@ -214,11 +257,11 @@ schema("/actions") ->
             summary => <<"Create bridge">>,
             description => ?DESC("desc_api2"),
             'requestBody' => emqx_dashboard_swagger:schema_with_examples(
-                emqx_bridge_v2_schema:post_request(),
-                bridge_info_examples(post)
+                emqx_bridge_v2_schema:actions_post_request(),
+                bridge_info_examples(post, ?ROOT_KEY_ACTIONS)
             ),
             responses => #{
-                201 => get_response_body_schema(),
+                201 => actions_get_response_body_schema(),
                 400 => error_schema('ALREADY_EXISTS', "Bridge already exists")
             }
         }
@@ -232,7 +275,7 @@ schema("/actions/:id") ->
             description => ?DESC("desc_api3"),
             parameters => [param_path_id()],
             responses => #{
-                200 => get_response_body_schema(),
+                200 => actions_get_response_body_schema(),
                 404 => error_schema('NOT_FOUND', "Bridge not found")
             }
         },
@@ -242,11 +285,11 @@ schema("/actions/:id") ->
             description => ?DESC("desc_api4"),
             parameters => [param_path_id()],
             'requestBody' => emqx_dashboard_swagger:schema_with_examples(
-                emqx_bridge_v2_schema:put_request(),
-                bridge_info_examples(put)
+                emqx_bridge_v2_schema:actions_put_request(),
+                bridge_info_examples(put, ?ROOT_KEY_ACTIONS)
             ),
             responses => #{
-                200 => get_response_body_schema(),
+                200 => actions_get_response_body_schema(),
                 404 => error_schema('NOT_FOUND', "Bridge not found"),
                 400 => error_schema('BAD_REQUEST', "Update bridge failed")
             }
@@ -371,8 +414,8 @@ schema("/actions_probe") ->
             desc => ?DESC("desc_api9"),
             summary => <<"Test creating bridge">>,
             'requestBody' => emqx_dashboard_swagger:schema_with_examples(
-                emqx_bridge_v2_schema:post_request(),
-                bridge_info_examples(post)
+                emqx_bridge_v2_schema:actions_post_request(),
+                bridge_info_examples(post, ?ROOT_KEY_ACTIONS)
             ),
             responses => #{
                 204 => <<"Test bridge OK">>,
@@ -389,12 +432,223 @@ schema("/action_types") ->
             summary => <<"List available action types">>,
             responses => #{
                 200 => emqx_dashboard_swagger:schema_with_examples(
-                    array(emqx_bridge_v2_schema:types_sc()),
+                    array(emqx_bridge_v2_schema:action_types_sc()),
                     #{
                         <<"types">> =>
                             #{
                                 summary => <<"Action types">>,
-                                value => emqx_bridge_v2_schema:types()
+                                value => emqx_bridge_v2_schema:action_types()
+                            }
+                    }
+                )
+            }
+        }
+    };
+%%================================================================================
+%% Sources
+%%================================================================================
+schema("/sources") ->
+    #{
+        'operationId' => '/sources',
+        get => #{
+            tags => [<<"sources">>],
+            summary => <<"List sources">>,
+            description => ?DESC("desc_api1"),
+            responses => #{
+                %% FIXME: examples
+                200 => emqx_dashboard_swagger:schema_with_example(
+                    array(emqx_bridge_v2_schema:sources_get_response()),
+                    bridge_info_array_example(get, ?ROOT_KEY_SOURCES)
+                )
+            }
+        },
+        post => #{
+            tags => [<<"sources">>],
+            summary => <<"Create source">>,
+            description => ?DESC("desc_api2"),
+            %% FIXME: examples
+            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+                emqx_bridge_v2_schema:sources_post_request(),
+                bridge_info_examples(post, ?ROOT_KEY_SOURCES)
+            ),
+            responses => #{
+                201 => sources_get_response_body_schema(),
+                400 => error_schema('ALREADY_EXISTS', "Source already exists")
+            }
+        }
+    };
+schema("/sources/:id") ->
+    #{
+        'operationId' => '/sources/:id',
+        get => #{
+            tags => [<<"sources">>],
+            summary => <<"Get source">>,
+            description => ?DESC("desc_api3"),
+            parameters => [param_path_id()],
+            responses => #{
+                200 => sources_get_response_body_schema(),
+                404 => error_schema('NOT_FOUND', "Source not found")
+            }
+        },
+        put => #{
+            tags => [<<"sources">>],
+            summary => <<"Update source">>,
+            description => ?DESC("desc_api4"),
+            parameters => [param_path_id()],
+            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+                emqx_bridge_v2_schema:sources_put_request(),
+                bridge_info_examples(put, ?ROOT_KEY_SOURCES)
+            ),
+            responses => #{
+                200 => sources_get_response_body_schema(),
+                404 => error_schema('NOT_FOUND', "Source not found"),
+                400 => error_schema('BAD_REQUEST', "Update source failed")
+            }
+        },
+        delete => #{
+            tags => [<<"sources">>],
+            summary => <<"Delete source">>,
+            description => ?DESC("desc_api5"),
+            parameters => [param_path_id(), param_qs_delete_cascade()],
+            responses => #{
+                204 => <<"Source deleted">>,
+                400 => error_schema(
+                    'BAD_REQUEST',
+                    "Cannot delete bridge while active rules are defined for this source",
+                    [{rules, mk(array(string()), #{desc => "Dependent Rule IDs"})}]
+                ),
+                404 => error_schema('NOT_FOUND', "Source not found"),
+                503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
+            }
+        }
+    };
+schema("/sources/:id/metrics") ->
+    #{
+        'operationId' => '/sources/:id/metrics',
+        get => #{
+            tags => [<<"sources">>],
+            summary => <<"Get source metrics">>,
+            description => ?DESC("desc_bridge_metrics"),
+            parameters => [param_path_id()],
+            responses => #{
+                200 => emqx_bridge_schema:metrics_fields(),
+                404 => error_schema('NOT_FOUND', "Source not found")
+            }
+        }
+    };
+schema("/sources/:id/metrics/reset") ->
+    #{
+        'operationId' => '/sources/:id/metrics/reset',
+        put => #{
+            tags => [<<"sources">>],
+            summary => <<"Reset source metrics">>,
+            description => ?DESC("desc_api6"),
+            parameters => [param_path_id()],
+            responses => #{
+                204 => <<"Reset success">>,
+                404 => error_schema('NOT_FOUND', "Source not found")
+            }
+        }
+    };
+schema("/sources/:id/enable/:enable") ->
+    #{
+        'operationId' => '/sources/:id/enable/:enable',
+        put =>
+            #{
+                tags => [<<"sources">>],
+                summary => <<"Enable or disable bridge">>,
+                desc => ?DESC("desc_enable_bridge"),
+                parameters => [param_path_id(), param_path_enable()],
+                responses =>
+                    #{
+                        204 => <<"Success">>,
+                        404 => error_schema(
+                            'NOT_FOUND', "Bridge not found or invalid operation"
+                        ),
+                        503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
+                    }
+            }
+    };
+schema("/sources/:id/:operation") ->
+    #{
+        'operationId' => '/sources/:id/:operation',
+        post => #{
+            tags => [<<"sources">>],
+            summary => <<"Manually start a bridge">>,
+            description => ?DESC("desc_api7"),
+            parameters => [
+                param_path_id(),
+                param_path_operation_cluster()
+            ],
+            responses => #{
+                204 => <<"Operation success">>,
+                400 => error_schema(
+                    'BAD_REQUEST', "Problem with configuration of external service"
+                ),
+                404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"),
+                501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
+                503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
+            }
+        }
+    };
+schema("/nodes/:node/sources/:id/:operation") ->
+    #{
+        'operationId' => '/nodes/:node/sources/:id/:operation',
+        post => #{
+            tags => [<<"sources">>],
+            summary => <<"Manually start a bridge on a given node">>,
+            description => ?DESC("desc_api8"),
+            parameters => [
+                param_path_node(),
+                param_path_id(),
+                param_path_operation_on_node()
+            ],
+            responses => #{
+                204 => <<"Operation success">>,
+                400 => error_schema(
+                    'BAD_REQUEST',
+                    "Problem with configuration of external service or bridge not enabled"
+                ),
+                404 => error_schema(
+                    'NOT_FOUND', "Bridge or node not found or invalid operation"
+                ),
+                501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
+                503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
+            }
+        }
+    };
+schema("/sources_probe") ->
+    #{
+        'operationId' => '/sources_probe',
+        post => #{
+            tags => [<<"sources">>],
+            desc => ?DESC("desc_api9"),
+            summary => <<"Test creating bridge">>,
+            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+                emqx_bridge_v2_schema:sources_post_request(),
+                bridge_info_examples(post, ?ROOT_KEY_SOURCES)
+            ),
+            responses => #{
+                204 => <<"Test bridge OK">>,
+                400 => error_schema(['TEST_FAILED'], "bridge test failed")
+            }
+        }
+    };
+schema("/source_types") ->
+    #{
+        'operationId' => '/source_types',
+        get => #{
+            tags => [<<"sources">>],
+            desc => ?DESC("desc_api10"),
+            summary => <<"List available source types">>,
+            responses => #{
+                200 => emqx_dashboard_swagger:schema_with_examples(
+                    array(emqx_bridge_v2_schema:action_types_sc()),
+                    #{
+                        <<"types">> =>
+                            #{
+                                summary => <<"Source types">>,
+                                value => emqx_bridge_v2_schema:action_types()
                             }
                     }
                 )
@@ -402,6 +656,12 @@ schema("/action_types") ->
         }
     }.
 
+%%------------------------------------------------------------------------------
+%% Thin Handlers
+%%------------------------------------------------------------------------------
+%%================================================================================
+%% Actions
+%%================================================================================
 '/actions'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
     handle_create(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, Conf0);
 '/actions'(get, _Params) ->
@@ -439,7 +699,48 @@ schema("/action_types") ->
     handle_probe(?ROOT_KEY_ACTIONS, Request).
 
 '/action_types'(get, _Request) ->
-    ?OK(emqx_bridge_v2_schema:types()).
+    ?OK(emqx_bridge_v2_schema:action_types()).
+%%================================================================================
+%% Sources
+%%================================================================================
+'/sources'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
+    handle_create(?ROOT_KEY_SOURCES, BridgeType, BridgeName, Conf0);
+'/sources'(get, _Params) ->
+    handle_list(?ROOT_KEY_SOURCES).
+
+'/sources/:id'(get, #{bindings := #{id := Id}}) ->
+    ?TRY_PARSE_ID(Id, lookup_from_all_nodes(?ROOT_KEY_SOURCES, BridgeType, BridgeName, 200));
+'/sources/:id'(put, #{bindings := #{id := Id}, body := Conf0}) ->
+    handle_update(?ROOT_KEY_SOURCES, Id, Conf0);
+'/sources/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) ->
+    handle_delete(?ROOT_KEY_SOURCES, Id, Qs).
+
+'/sources/:id/metrics'(get, #{bindings := #{id := Id}}) ->
+    ?TRY_PARSE_ID(Id, get_metrics_from_all_nodes(?ROOT_KEY_SOURCES, BridgeType, BridgeName)).
+
+'/sources/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
+    handle_reset_metrics(?ROOT_KEY_SOURCES, Id).
+
+'/sources/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
+    handle_disable_enable(?ROOT_KEY_SOURCES, Id, Enable).
+
+'/sources/:id/:operation'(post, #{
+    bindings :=
+        #{id := Id, operation := Op}
+}) ->
+    handle_operation(?ROOT_KEY_SOURCES, Id, Op).
+
+'/nodes/:node/sources/:id/:operation'(post, #{
+    bindings :=
+        #{id := Id, operation := Op, node := Node}
+}) ->
+    handle_node_operation(?ROOT_KEY_SOURCES, Node, Id, Op).
+
+'/sources_probe'(post, Request) ->
+    handle_probe(?ROOT_KEY_SOURCES, Request).
+
+'/source_types'(get, _Request) ->
+    ?OK(emqx_bridge_v2_schema:source_types()).
 
 %%------------------------------------------------------------------------------
 %% Handlers
@@ -451,7 +752,7 @@ handle_list(ConfRootKey) ->
     case is_ok(NodeReplies) of
         {ok, NodeBridges} ->
             AllBridges = [
-                [format_resource(Data, Node) || Data <- Bridges]
+                [format_resource(ConfRootKey, Data, Node) || Data <- Bridges]
              || {Node, Bridges} <- lists:zip(Nodes, NodeBridges)
             ],
             ?OK(zip_bridges(AllBridges));
@@ -574,7 +875,12 @@ handle_node_operation(ConfRootKey, Node, Id, Op) ->
     ).
 
 handle_probe(ConfRootKey, Request) ->
-    RequestMeta = #{module => ?MODULE, method => post, path => "/actions_probe"},
+    Path =
+        case ConfRootKey of
+            ?ROOT_KEY_ACTIONS -> "/actions_probe";
+            ?ROOT_KEY_SOURCES -> "/sources_probe"
+        end,
+    RequestMeta = #{module => ?MODULE, method => post, path => Path},
     case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of
         {ok, #{body := #{<<"type">> := Type} = Params}} ->
             Params1 = maybe_deobfuscate_bridge_probe(Params),
@@ -664,8 +970,8 @@ get_metrics_from_all_nodes(ConfRootKey, Type, Name) ->
             ?INTERNAL_ERROR(Reason)
     end.
 
-operation_func(all, start) -> v2_start_bridge_to_all_nodes_v6;
-operation_func(_Node, start) -> v2_start_bridge_to_node_v6;
+operation_func(all, start) -> v2_start_bridge_on_all_nodes_v6;
+operation_func(_Node, start) -> v2_start_bridge_on_node_v6;
 operation_func(all, lookup) -> v2_lookup_from_all_nodes_v6;
 operation_func(all, list) -> v2_list_bridges_on_nodes_v6;
 operation_func(all, get_metrics) -> v2_get_metrics_from_all_nodes_v6.
@@ -825,7 +1131,7 @@ aggregate_status(AllStatus) ->
 %% RPC Target
 lookup_from_local_node(BridgeType, BridgeName) ->
     case emqx_bridge_v2:lookup(BridgeType, BridgeName) of
-        {ok, Res} -> {ok, format_resource(Res, node())};
+        {ok, Res} -> {ok, format_resource(?ROOT_KEY_ACTIONS, Res, node())};
         Error -> Error
     end.
 
@@ -833,7 +1139,7 @@ lookup_from_local_node(BridgeType, BridgeName) ->
 -spec lookup_from_local_node_v6(emqx_bridge_v2:root_cfg_key(), _, _) -> _.
 lookup_from_local_node_v6(ConfRootKey, BridgeType, BridgeName) ->
     case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of
-        {ok, Res} -> {ok, format_resource(Res, node())};
+        {ok, Res} -> {ok, format_resource(ConfRootKey, Res, node())};
         Error -> Error
     end.
 
@@ -847,6 +1153,7 @@ get_metrics_from_local_node_v6(ConfRootKey, Type, Name) ->
 
 %% resource
 format_resource(
+    ConfRootKey,
     #{
         type := Type,
         name := Name,
@@ -857,7 +1164,7 @@ format_resource(
     },
     Node
 ) ->
-    RawConf = fill_defaults(Type, RawConf0),
+    RawConf = fill_defaults(ConfRootKey, Type, RawConf0),
     redact(
         maps:merge(
             RawConf#{
@@ -988,17 +1295,18 @@ aggregate_metrics(
         M17 + N17
     ).
 
-fill_defaults(Type, RawConf) ->
-    PackedConf = pack_bridge_conf(Type, RawConf),
+fill_defaults(ConfRootKey, Type, RawConf) ->
+    PackedConf = pack_bridge_conf(ConfRootKey, Type, RawConf),
     FullConf = emqx_config:fill_defaults(emqx_bridge_v2_schema, PackedConf, #{}),
-    unpack_bridge_conf(Type, FullConf).
+    unpack_bridge_conf(ConfRootKey, Type, FullConf).
 
-pack_bridge_conf(Type, RawConf) ->
-    #{<<"actions">> => #{bin(Type) => #{<<"foo">> => RawConf}}}.
+pack_bridge_conf(ConfRootKey, Type, RawConf) ->
+    #{bin(ConfRootKey) => #{bin(Type) => #{<<"foo">> => RawConf}}}.
 
-unpack_bridge_conf(Type, PackedConf) ->
+unpack_bridge_conf(ConfRootKey, Type, PackedConf) ->
+    ConfRootKeyBin = bin(ConfRootKey),
     TypeBin = bin(Type),
-    #{<<"actions">> := Bridges} = PackedConf,
+    #{ConfRootKeyBin := Bridges} = PackedConf,
     #{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges),
     RawConf.
 

+ 6 - 6
apps/emqx_bridge/src/proto/emqx_bridge_proto_v6.erl

@@ -35,8 +35,8 @@
     v2_lookup_from_all_nodes_v6/4,
     v2_list_bridges_on_nodes_v6/2,
     v2_get_metrics_from_all_nodes_v6/4,
-    v2_start_bridge_to_node_v6/4,
-    v2_start_bridge_to_all_nodes_v6/4
+    v2_start_bridge_on_node_v6/4,
+    v2_start_bridge_on_all_nodes_v6/4
 ]).
 
 -include_lib("emqx/include/bpapi.hrl").
@@ -173,9 +173,9 @@ v2_get_metrics_from_all_nodes_v6(Nodes, ConfRootKey, ActionType, ActionName) ->
         ?TIMEOUT
     ).
 
--spec v2_start_bridge_to_all_nodes_v6([node()], emqx_bridge_v2:root_cfg_key(), key(), key()) ->
+-spec v2_start_bridge_on_all_nodes_v6([node()], emqx_bridge_v2:root_cfg_key(), key(), key()) ->
     emqx_rpc:erpc_multicall(ok).
-v2_start_bridge_to_all_nodes_v6(Nodes, ConfRootKey, BridgeType, BridgeName) ->
+v2_start_bridge_on_all_nodes_v6(Nodes, ConfRootKey, BridgeType, BridgeName) ->
     erpc:multicall(
         Nodes,
         emqx_bridge_v2,
@@ -184,9 +184,9 @@ v2_start_bridge_to_all_nodes_v6(Nodes, ConfRootKey, BridgeType, BridgeName) ->
         ?TIMEOUT
     ).
 
--spec v2_start_bridge_to_node_v6(node(), emqx_bridge_v2:root_cfg_key(), key(), key()) ->
+-spec v2_start_bridge_on_node_v6(node(), emqx_bridge_v2:root_cfg_key(), key(), key()) ->
     term().
-v2_start_bridge_to_node_v6(Node, ConfRootKey, BridgeType, BridgeName) ->
+v2_start_bridge_on_node_v6(Node, ConfRootKey, BridgeType, BridgeName) ->
     rpc:call(
         Node,
         emqx_bridge_v2,

+ 172 - 67
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -28,21 +28,31 @@
 -export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
 
 -export([
-    get_response/0,
-    put_request/0,
-    post_request/0,
-    examples/1,
+    actions_get_response/0,
+    actions_put_request/0,
+    actions_post_request/0,
+    actions_examples/1,
     action_values/4
 ]).
 
+-export([
+    sources_get_response/0,
+    sources_put_request/0,
+    sources_post_request/0,
+    sources_examples/1,
+    source_values/4
+]).
+
 %% Exported for mocking
 %% TODO: refactor emqx_bridge_v1_compatibility_layer_SUITE so we don't need to
 %% export this
 -export([
-    registered_api_schemas/1
+    registered_actions_api_schemas/1,
+    registered_sources_api_schemas/1
 ]).
 
--export([types/0, types_sc/0]).
+-export([action_types/0, action_types_sc/0]).
+-export([source_types/0, source_types_sc/0]).
 -export([resource_opts_fields/0, resource_opts_fields/1]).
 
 -export([
@@ -58,33 +68,140 @@
 
 -export([actions_convert_from_connectors/1]).
 
--export_type([action_type/0]).
+-export_type([action_type/0, source_type/0]).
 
 %% Should we explicitly list them here so dialyzer may be more helpful?
 -type action_type() :: atom().
+-type source_type() :: atom().
+-type http_method() :: get | post | put.
+-type schema_example_map() :: #{atom() => term()}.
 
 %%======================================================================================
 %% For HTTP APIs
-get_response() ->
-    api_schema("get").
+%%======================================================================================
+
+%%---------------------------------------------
+%% Actions
+%%---------------------------------------------
 
-put_request() ->
-    api_schema("put").
+actions_get_response() ->
+    actions_api_schema("get").
 
-post_request() ->
-    api_schema("post").
+actions_put_request() ->
+    actions_api_schema("put").
 
-api_schema(Method) ->
-    APISchemas = ?MODULE:registered_api_schemas(Method),
+actions_post_request() ->
+    actions_api_schema("post").
+
+actions_api_schema(Method) ->
+    APISchemas = ?MODULE:registered_actions_api_schemas(Method),
     hoconsc:union(bridge_api_union(APISchemas)).
 
-registered_api_schemas(Method) ->
+registered_actions_api_schemas(Method) ->
     RegisteredSchemas = emqx_action_info:registered_schema_modules_actions(),
     [
         api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2")
      || {BridgeV2Type, SchemaModule} <- RegisteredSchemas
     ].
 
+-spec action_values(http_method(), atom(), atom(), schema_example_map()) -> schema_example_map().
+action_values(Method, ActionType, ConnectorType, ActionValues) ->
+    ActionTypeBin = atom_to_binary(ActionType),
+    ConnectorTypeBin = atom_to_binary(ConnectorType),
+    lists:foldl(
+        fun(M1, M2) ->
+            maps:merge(M1, M2)
+        end,
+        #{
+            enable => true,
+            description => <<"My example ", ActionTypeBin/binary, " action">>,
+            connector => <<ConnectorTypeBin/binary, "_connector">>,
+            resource_opts => #{
+                health_check_interval => "30s"
+            }
+        },
+        [
+            ActionValues,
+            method_values(action, Method, ActionType)
+        ]
+    ).
+
+actions_examples(Method) ->
+    MergeFun =
+        fun(Example, Examples) ->
+            maps:merge(Examples, Example)
+        end,
+    Fun =
+        fun(Module, Examples) ->
+            ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
+            lists:foldl(MergeFun, Examples, ConnectorExamples)
+        end,
+    SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules_actions()],
+    lists:foldl(Fun, #{}, SchemaModules).
+
+%%---------------------------------------------
+%% Sources
+%%---------------------------------------------
+
+sources_get_response() ->
+    sources_api_schema("get").
+
+sources_put_request() ->
+    sources_api_schema("put").
+
+sources_post_request() ->
+    sources_api_schema("post").
+
+sources_api_schema(Method) ->
+    APISchemas = ?MODULE:registered_sources_api_schemas(Method),
+    hoconsc:union(bridge_api_union(APISchemas)).
+
+registered_sources_api_schemas(Method) ->
+    RegisteredSchemas = emqx_action_info:registered_schema_modules_sources(),
+    [
+        api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_source")
+     || {BridgeV2Type, SchemaModule} <- RegisteredSchemas
+    ].
+
+-spec source_values(http_method(), atom(), atom(), schema_example_map()) -> schema_example_map().
+source_values(Method, SourceType, ConnectorType, SourceValues) ->
+    SourceTypeBin = atom_to_binary(SourceType),
+    ConnectorTypeBin = atom_to_binary(ConnectorType),
+    lists:foldl(
+        fun(M1, M2) ->
+            maps:merge(M1, M2)
+        end,
+        #{
+            enable => true,
+            description => <<"My example ", SourceTypeBin/binary, " source">>,
+            connector => <<ConnectorTypeBin/binary, "_connector">>,
+            resource_opts => #{
+                health_check_interval => "30s"
+            }
+        },
+        [
+            SourceValues,
+            method_values(source, Method, SourceType)
+        ]
+    ).
+
+sources_examples(Method) ->
+    MergeFun =
+        fun(Example, Examples) ->
+            maps:merge(Examples, Example)
+        end,
+    Fun =
+        fun(Module, Examples) ->
+            ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
+            lists:foldl(MergeFun, Examples, ConnectorExamples)
+        end,
+    SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules_sources()],
+    lists:foldl(Fun, #{}, SchemaModules).
+
+%%---------------------------------------------
+%% Common helpers
+%%---------------------------------------------
+
 api_ref(Module, Type, Method) ->
     {Type, ref(Module, Method)}.
 
@@ -111,41 +228,17 @@ bridge_api_union(Refs) ->
             end
     end.
 
--type http_method() :: get | post | put.
--type schema_example_map() :: #{atom() => term()}.
-
--spec action_values(http_method(), atom(), atom(), schema_example_map()) -> schema_example_map().
-action_values(Method, ActionType, ConnectorType, ActionValues) ->
-    ActionTypeBin = atom_to_binary(ActionType),
-    ConnectorTypeBin = atom_to_binary(ConnectorType),
-    lists:foldl(
-        fun(M1, M2) ->
-            maps:merge(M1, M2)
-        end,
-        #{
-            enable => true,
-            description => <<"My example ", ActionTypeBin/binary, " action">>,
-            connector => <<ConnectorTypeBin/binary, "_connector">>,
-            resource_opts => #{
-                health_check_interval => "30s"
-            }
-        },
-        [
-            ActionValues,
-            method_values(Method, ActionType)
-        ]
-    ).
-
--spec method_values(http_method(), atom()) -> schema_example_map().
-method_values(post, Type) ->
+-spec method_values(action | source, http_method(), atom()) -> schema_example_map().
+method_values(Kind, post, Type) ->
+    KindBin = atom_to_binary(Kind),
     TypeBin = atom_to_binary(Type),
     #{
-        name => <<TypeBin/binary, "_action">>,
+        name => <<TypeBin/binary, "_", KindBin/binary>>,
         type => TypeBin
     };
-method_values(get, Type) ->
+method_values(Kind, get, Type) ->
     maps:merge(
-        method_values(post, Type),
+        method_values(Kind, post, Type),
         #{
             status => <<"connected">>,
             node_status => [
@@ -156,7 +249,7 @@ method_values(get, Type) ->
             ]
         }
     );
-method_values(put, _Type) ->
+method_values(_Kind, put, _Type) ->
     #{}.
 
 api_fields("get_bridge_v2", Type, Fields) ->
@@ -175,16 +268,33 @@ api_fields("post_bridge_v2", Type, Fields) ->
         ]
     );
 api_fields("put_bridge_v2", _Type, Fields) ->
+    Fields;
+api_fields("get_source", Type, Fields) ->
+    lists:append(
+        [
+            emqx_bridge_schema:type_and_name_fields(Type),
+            emqx_bridge_schema:status_fields(),
+            Fields
+        ]
+    );
+api_fields("post_source", Type, Fields) ->
+    lists:append(
+        [
+            emqx_bridge_schema:type_and_name_fields(Type),
+            Fields
+        ]
+    );
+api_fields("put_source", _Type, Fields) ->
     Fields.
 
 %%======================================================================================
 %% HOCON Schema Callbacks
 %%======================================================================================
 
-namespace() -> "actions".
+namespace() -> "actions_and_sources".
 
 tags() ->
-    [<<"Actions">>].
+    [<<"Actions">>, <<"Sources">>].
 
 -dialyzer({nowarn_function, roots/0}).
 
@@ -231,13 +341,21 @@ desc(resource_opts) ->
 desc(_) ->
     undefined.
 
--spec types() -> [action_type()].
-types() ->
+-spec action_types() -> [action_type()].
+action_types() ->
     proplists:get_keys(?MODULE:fields(actions)).
 
--spec types_sc() -> ?ENUM([action_type()]).
-types_sc() ->
-    hoconsc:enum(types()).
+-spec action_types_sc() -> ?ENUM([action_type()]).
+action_types_sc() ->
+    hoconsc:enum(action_types()).
+
+-spec source_types() -> [source_type()].
+source_types() ->
+    proplists:get_keys(?MODULE:fields(sources)).
+
+-spec source_types_sc() -> ?ENUM([source_type()]).
+source_types_sc() ->
+    hoconsc:enum(source_types()).
 
 resource_opts_fields() ->
     resource_opts_fields(_Overrides = []).
@@ -268,19 +386,6 @@ resource_opts_fields(Overrides) ->
         emqx_resource_schema:create_opts(Overrides)
     ).
 
-examples(Method) ->
-    MergeFun =
-        fun(Example, Examples) ->
-            maps:merge(Examples, Example)
-        end,
-    Fun =
-        fun(Module, Examples) ->
-            ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
-            lists:foldl(MergeFun, Examples, ConnectorExamples)
-        end,
-    SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules_actions()],
-    lists:foldl(Fun, #{}, SchemaModules).
-
 top_level_common_action_keys() ->
     [
         <<"connector">>,

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl

@@ -104,7 +104,7 @@ setup_mocks() ->
     catch meck:new(emqx_bridge_v2_schema, MeckOpts),
     meck:expect(
         emqx_bridge_v2_schema,
-        registered_api_schemas,
+        registered_actions_api_schemas,
         1,
         fun(Method) ->
             [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_v2_" ++ Method)}]

+ 120 - 69
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -12,6 +12,9 @@
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
+-define(ROOT_KEY_ACTIONS, actions).
+-define(ROOT_KEY_SOURCES, sources).
+
 %% ct setup helpers
 
 init_per_suite(Config, Apps) ->
@@ -152,6 +155,49 @@ create_bridge(Config, Overrides) ->
     ct:pal("creating bridge with config: ~p", [BridgeConfig]),
     emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig).
 
+get_ct_config_with_fallback(Config, [Key]) ->
+    ?config(Key, Config);
+get_ct_config_with_fallback(Config, [Key | Rest]) ->
+    case ?config(Key, Config) of
+        undefined ->
+            get_ct_config_with_fallback(Config, Rest);
+        X ->
+            X
+    end.
+
+get_config_by_kind(Config, Overrides) ->
+    Kind = ?config(bridge_kind, Config),
+    get_config_by_kind(Kind, Config, Overrides).
+
+get_config_by_kind(Kind, Config, Overrides) ->
+    case Kind of
+        action ->
+            %% TODO: refactor tests to use action_type...
+            ActionType = get_ct_config_with_fallback(Config, [action_type, bridge_type]),
+            ActionName = get_ct_config_with_fallback(Config, [action_name, bridge_name]),
+            ActionConfig0 = get_ct_config_with_fallback(Config, [action_config, bridge_config]),
+            ActionConfig = emqx_utils_maps:deep_merge(ActionConfig0, Overrides),
+            #{type => ActionType, name => ActionName, config => ActionConfig};
+        source ->
+            SourceType = ?config(source_type, Config),
+            SourceName = ?config(source_name, Config),
+            SourceConfig0 = ?config(source_config, Config),
+            SourceConfig = emqx_utils_maps:deep_merge(SourceConfig0, Overrides),
+            #{type => SourceType, name => SourceName, config => SourceConfig}
+    end.
+
+api_path_root(Kind) ->
+    case Kind of
+        action -> "actions";
+        source -> "sources"
+    end.
+
+conf_root_key(Kind) ->
+    case Kind of
+        action -> ?ROOT_KEY_ACTIONS;
+        source -> ?ROOT_KEY_SOURCES
+    end.
+
 maybe_json_decode(X) ->
     case emqx_utils_json:safe_decode(X, [return_maps]) of
         {ok, Decoded} -> Decoded;
@@ -218,26 +264,26 @@ create_bridge_api(Config) ->
     create_bridge_api(Config, _Overrides = #{}).
 
 create_bridge_api(Config, Overrides) ->
-    BridgeType = ?config(bridge_type, Config),
-    BridgeName = ?config(bridge_name, Config),
-    BridgeConfig0 = ?config(bridge_config, Config),
-    BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
-
     {ok, {{_, 201, _}, _, _}} = create_connector_api(Config),
-
-    Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
-    Path = emqx_mgmt_api_test_util:api_path(["actions"]),
-    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
-    Opts = #{return_all => true},
-    ct:pal("creating bridge (via http): ~p", [Params]),
-    Res =
-        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
-            {ok, {Status, Headers, Body0}} ->
-                {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
-            Error ->
-                Error
-        end,
-    ct:pal("bridge create result: ~p", [Res]),
+    create_kind_api(Config, Overrides).
+
+create_kind_api(Config) ->
+    create_kind_api(Config, _Overrides = #{}).
+
+create_kind_api(Config, Overrides) ->
+    Kind = proplists:get_value(bridge_kind, Config, action),
+    #{
+        type := Type,
+        name := Name,
+        config := BridgeConfig
+    } = get_config_by_kind(Kind, Config, Overrides),
+    Params = BridgeConfig#{<<"type">> => Type, <<"name">> => Name},
+    PathRoot = api_path_root(Kind),
+    Path = emqx_mgmt_api_test_util:api_path([PathRoot]),
+    ct:pal("creating bridge (~s, http):\n  ~p", [Kind, Params]),
+    Method = post,
+    Res = request(Method, Path, Params),
+    ct:pal("bridge create (~s, http) result:\n  ~p", [Kind, Res]),
     Res.
 
 create_connector_api(Config) ->
@@ -288,27 +334,29 @@ update_bridge_api(Config) ->
     update_bridge_api(Config, _Overrides = #{}).
 
 update_bridge_api(Config, Overrides) ->
-    BridgeType = ?config(bridge_type, Config),
-    Name = ?config(bridge_name, Config),
-    BridgeConfig0 = ?config(bridge_config, Config),
-    BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
-    BridgeId = emqx_bridge_resource:bridge_id(BridgeType, Name),
-    Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]),
-    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
-    Opts = #{return_all => true},
-    ct:pal("updating bridge (via http): ~p", [BridgeConfig]),
-    Res =
-        case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, BridgeConfig, Opts) of
-            {ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])};
-            Error -> Error
-        end,
-    ct:pal("bridge update result: ~p", [Res]),
+    Kind = proplists:get_value(bridge_kind, Config, action),
+    #{
+        type := Type,
+        name := Name,
+        config := Params
+    } = get_config_by_kind(Kind, Config, Overrides),
+    BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
+    PathRoot = api_path_root(Kind),
+    Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId]),
+    ct:pal("updating bridge (~s, http):\n  ~p", [Kind, Params]),
+    Method = put,
+    Res = request(Method, Path, Params),
+    ct:pal("update bridge (~s, http) result:\n  ~p", [Kind, Res]),
     Res.
 
 op_bridge_api(Op, BridgeType, BridgeName) ->
+    op_bridge_api(_Kind = action, Op, BridgeType, BridgeName).
+
+op_bridge_api(Kind, Op, BridgeType, BridgeName) ->
     BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
-    Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId, Op]),
-    ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
+    PathRoot = api_path_root(Kind),
+    Path = emqx_mgmt_api_test_util:api_path([PathRoot, BridgeId, Op]),
+    ct:pal("calling bridge ~p (~s, http):\n  ~p", [BridgeId, Kind, Op]),
     Method = post,
     Params = [],
     Res = request(Method, Path, Params),
@@ -326,17 +374,16 @@ probe_bridge_api(Config, Overrides) ->
     probe_bridge_api(BridgeType, BridgeName, BridgeConfig).
 
 probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
+    probe_bridge_api(action, BridgeType, BridgeName, BridgeConfig).
+
+probe_bridge_api(Kind, BridgeType, BridgeName, BridgeConfig) ->
     Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
-    Path = emqx_mgmt_api_test_util:api_path(["actions_probe"]),
-    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
-    Opts = #{return_all => true},
-    ct:pal("probing bridge (via http): ~p", [Params]),
-    Res =
-        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
-            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
-            Error -> Error
-        end,
-    ct:pal("bridge probe result: ~p", [Res]),
+    PathRoot = api_path_root(Kind),
+    Path = emqx_mgmt_api_test_util:api_path([PathRoot ++ "_probe"]),
+    ct:pal("probing bridge (~s, http):\n  ~p", [Kind, Params]),
+    Method = post,
+    Res = request(Method, Path, Params),
+    ct:pal("bridge probe (~s, http) result:\n  ~p", [Kind, Res]),
     Res.
 
 list_bridges_http_api_v1() ->
@@ -353,6 +400,13 @@ list_actions_http_api() ->
     ct:pal("list actions (http v2) result:\n  ~p", [Res]),
     Res.
 
+list_sources_http_api() ->
+    Path = emqx_mgmt_api_test_util:api_path(["sources"]),
+    ct:pal("list sources (http v2)"),
+    Res = request(get, Path, _Params = []),
+    ct:pal("list sources (http v2) result:\n  ~p", [Res]),
+    Res.
+
 list_connectors_http_api() ->
     Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
     ct:pal("list connectors"),
@@ -506,13 +560,6 @@ t_create_via_http(Config) ->
         begin
             ?assertMatch({ok, _}, create_bridge_api(Config)),
 
-            %% lightweight matrix testing some configs
-            ?assertMatch(
-                {ok, _},
-                update_bridge_api(
-                    Config
-                )
-            ),
             ?assertMatch(
                 {ok, _},
                 update_bridge_api(
@@ -526,23 +573,26 @@ t_create_via_http(Config) ->
     ok.
 
 t_start_stop(Config, StopTracePoint) ->
-    BridgeType = ?config(bridge_type, Config),
-    BridgeName = ?config(bridge_name, Config),
-    BridgeConfig = ?config(bridge_config, Config),
+    Kind = proplists:get_value(bridge_kind, Config, action),
     ConnectorName = ?config(connector_name, Config),
     ConnectorType = ?config(connector_type, Config),
-    ConnectorConfig = ?config(connector_config, Config),
+    #{
+        type := Type,
+        name := Name,
+        config := BridgeConfig
+    } = get_config_by_kind(Kind, Config, _Overrides = #{}),
 
     ?assertMatch(
-        {ok, _},
-        emqx_connector:create(ConnectorType, ConnectorName, ConnectorConfig)
+        {ok, {{_, 201, _}, _, _}},
+        create_connector_api(Config)
     ),
 
     ?check_trace(
         begin
             ProbeRes0 = probe_bridge_api(
-                BridgeType,
-                BridgeName,
+                Kind,
+                Type,
+                Name,
                 BridgeConfig
             ),
             ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
@@ -550,8 +600,9 @@ t_start_stop(Config, StopTracePoint) ->
             AtomsBefore = erlang:system_info(atom_count),
             %% Probe again; shouldn't have created more atoms.
             ProbeRes1 = probe_bridge_api(
-                BridgeType,
-                BridgeName,
+                Kind,
+                Type,
+                Name,
                 BridgeConfig
             ),
 
@@ -559,9 +610,9 @@ t_start_stop(Config, StopTracePoint) ->
             AtomsAfter = erlang:system_info(atom_count),
             ?assertEqual(AtomsBefore, AtomsAfter),
 
-            ?assertMatch({ok, _}, emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig)),
+            ?assertMatch({ok, _}, create_kind_api(Config)),
 
-            ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
+            ResourceId = emqx_bridge_resource:resource_id(conf_root_key(Kind), Type, Name),
 
             %% Since the connection process is async, we give it some time to
             %% stabilize and avoid flakiness.
@@ -574,7 +625,7 @@ t_start_stop(Config, StopTracePoint) ->
             %% `start` bridge to trigger `already_started`
             ?assertMatch(
                 {ok, {{_, 204, _}, _Headers, []}},
-                emqx_bridge_v2_testlib:op_bridge_api("start", BridgeType, BridgeName)
+                op_bridge_api(Kind, "start", Type, Name)
             ),
 
             ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
@@ -624,10 +675,10 @@ t_start_stop(Config, StopTracePoint) ->
                 )
             ),
 
-            ok
+            #{resource_id => ResourceId}
         end,
-        fun(Trace) ->
-            ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
+        fun(Res, Trace) ->
+            #{resource_id := ResourceId} = Res,
             %% one for each probe, one for real
             ?assertMatch(
                 [_, _, #{instance_id := ResourceId}],

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_v2_tests.erl

@@ -108,7 +108,7 @@ connector_resource_opts_test() ->
     ok.
 
 actions_api_spec_post_fields_test() ->
-    ?UNION(Union) = emqx_bridge_v2_schema:post_request(),
+    ?UNION(Union) = emqx_bridge_v2_schema:actions_post_request(),
     Schemas =
         lists:map(
             fun(?R_REF(SchemaMod, StructName)) ->

+ 3 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl

@@ -237,7 +237,9 @@ on_stop(ResourceId, State) ->
             ets:delete(TopicToHandlerIndex)
     end,
     Allocated = emqx_resource:get_allocated_resources(ResourceId),
-    ok = stop_helper(Allocated).
+    ok = stop_helper(Allocated),
+    ?tp(mqtt_connector_stopped, #{instance_id => ResourceId}),
+    ok.
 
 stop_helper(#{pool_name := PoolName}) ->
     emqx_resource_pool:stop(PoolName).

+ 15 - 8
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl

@@ -27,6 +27,9 @@
     conn_bridge_examples/1
 ]).
 
+-define(ACTION_TYPE, mqtt).
+-define(SOURCE_TYPE, mqtt).
+
 %%======================================================================================
 %% Hocon Schema Definitions
 namespace() -> "bridge_mqtt_publisher".
@@ -86,14 +89,18 @@ fields(action_resource_opts) ->
         fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
         emqx_bridge_v2_schema:resource_opts_fields()
     );
-fields("get_connector") ->
-    emqx_bridge_mqtt_connector_schema:fields("config_connector");
-fields("get_bridge_v2") ->
-    fields("mqtt_publisher_action");
-fields("post_bridge_v2") ->
-    fields("mqtt_publisher_action") ++ emqx_bridge_schema:type_and_name_fields(mqtt);
-fields("put_bridge_v2") ->
-    fields("mqtt_publisher_action");
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "post_bridge_v2";
+    Field == "put_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields("mqtt_publisher_action"));
+fields(Field) when
+    Field == "get_source";
+    Field == "post_source";
+    Field == "put_source"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields("mqtt_subscriber_source"));
 fields(What) ->
     error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}).
 %% v2: api schema

+ 175 - 0
apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl

@@ -0,0 +1,175 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_mqtt_v2_subscriber_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_hooks.hrl").
+-include_lib("stdlib/include/assert.hrl").
+-include_lib("emqx/include/asserts.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_conf,
+            emqx_connector,
+            emqx_bridge_mqtt,
+            emqx_bridge,
+            emqx_rule_engine,
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    {ok, Api} = emqx_common_test_http:create_default_app(),
+    [
+        {apps, Apps},
+        {api, Api}
+        | Config
+    ].
+
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
+    ok.
+
+init_per_testcase(TestCase, Config) ->
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
+    ConnectorConfig = connector_config(),
+    SourceConfig = source_config(#{connector => Name}),
+    [
+        {bridge_kind, source},
+        {source_type, mqtt},
+        {source_name, Name},
+        {source_config, SourceConfig},
+        {connector_type, mqtt},
+        {connector_name, Name},
+        {connector_config, ConnectorConfig}
+        | Config
+    ].
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+connector_config() ->
+    %% !!!!!!!!!!!! FIXME!!!!!! add more fields ("server_configs")
+    #{
+        <<"enable">> => true,
+        <<"description">> => <<"my connector">>,
+        <<"pool_size">> => 3,
+        <<"server">> => <<"127.0.0.1:1883">>,
+        <<"resource_opts">> => #{
+            <<"health_check_interval">> => <<"15s">>,
+            <<"start_after_created">> => true,
+            <<"start_timeout">> => <<"5s">>
+        }
+    }.
+
+source_config(Overrides0) ->
+    Overrides = emqx_utils_maps:binary_key_map(Overrides0),
+    CommonConfig =
+        #{
+            <<"enable">> => true,
+            <<"connector">> => <<"please override">>,
+            <<"parameters">> =>
+                #{
+                    <<"remote">> =>
+                        #{
+                            <<"topic">> => <<"remote/topic">>,
+                            <<"qos">> => 2
+                        },
+                    <<"local">> =>
+                        #{
+                            <<"topic">> => <<"local/topic">>,
+                            <<"qos">> => 2,
+                            <<"retain">> => false,
+                            <<"payload">> => <<"${payload}">>
+                        }
+                },
+            <<"resource_opts">> => #{
+                <<"batch_size">> => 1,
+                <<"batch_time">> => <<"0ms">>,
+                <<"buffer_mode">> => <<"memory_only">>,
+                <<"buffer_seg_bytes">> => <<"10MB">>,
+                <<"health_check_interval">> => <<"15s">>,
+                <<"inflight_window">> => 100,
+                <<"max_buffer_bytes">> => <<"256MB">>,
+                <<"metrics_flush_interval">> => <<"1s">>,
+                <<"query_mode">> => <<"sync">>,
+                <<"request_ttl">> => <<"45s">>,
+                <<"resume_interval">> => <<"15s">>,
+                <<"worker_pool_size">> => <<"1">>
+            }
+        },
+    maps:merge(CommonConfig, Overrides).
+
+replace(Key, Value, Proplist) ->
+    lists:keyreplace(Key, 1, Proplist, {Key, Value}).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_create_via_http(Config) ->
+    ConnectorName = ?config(connector_name, Config),
+    ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
+    ?assertMatch(
+        {ok,
+            {{_, 200, _}, _, [
+                #{
+                    <<"enable">> := true,
+                    <<"status">> := <<"connected">>
+                }
+            ]}},
+        emqx_bridge_v2_testlib:list_bridges_http_api_v1()
+    ),
+    NewSourceName = <<"my_other_source">>,
+    {ok, {{_, 201, _}, _, _}} =
+        emqx_bridge_v2_testlib:create_kind_api(
+            replace(source_name, NewSourceName, Config)
+        ),
+    ?assertMatch(
+        {ok,
+            {{_, 200, _}, _, [
+                #{<<"connector">> := ConnectorName},
+                #{<<"connector">> := ConnectorName}
+            ]}},
+        emqx_bridge_v2_testlib:list_sources_http_api()
+    ),
+    ?assertMatch(
+        {ok, {{_, 200, _}, _, []}},
+        emqx_bridge_v2_testlib:list_bridges_http_api_v1()
+    ),
+    ok.
+
+t_start_stop(Config) ->
+    ok = emqx_bridge_v2_testlib:t_start_stop(Config, mqtt_connector_stopped),
+    ok.