Просмотр исходного кода

Merge remote-tracking branch 'olcai/refactor-bridges-api' into dev/api-refactor

Stefan Strigler 3 лет назад
Родитель
Сommit
e08c1d2229

+ 39 - 8
apps/emqx_bridge/i18n/emqx_bridge_api.conf

@@ -2,8 +2,8 @@ emqx_bridge_api {
 
     desc_param_path_operation_cluster {
                    desc {
-                         en: """Operations can be one of: enable, disable, start, stop, restart"""
-                         zh: """集群可用操作:启用、禁用、启动、停止、重新启动"""
+                         en: """Operations can be one of: stop, restart"""
+                         zh: """集群可用操作:停止、重新启动"""
                         }
                    label: {
                            en: "Cluster Operation"
@@ -44,6 +44,16 @@ emqx_bridge_api {
                           }
                   }
 
+    desc_param_path_enable {
+                   desc {
+                         en: """Whether or not the bridge is enabled"""
+                         zh: """是否启用桥接"""
+                        }
+                   label: {
+                           en: "Enable bridge"
+                           zh: "启用桥接"
+                          }
+                  }
     desc_api1 {
                    desc {
                          en: """List all created bridges"""
@@ -112,8 +122,8 @@ emqx_bridge_api {
 
     desc_api7 {
                    desc {
-                         en: """Enable/Disable/Stop/Restart bridges on all nodes in the cluster."""
-                         zh: """在集群中的所有节点上启用/禁用/停止/重新启动 Bridge。"""
+                         en: """Stop/Restart bridges on all nodes in the cluster."""
+                         zh: """停止或启用所有节点上的桥接"""
                         }
                    label: {
                            en: "Cluster Bridge Operate"
@@ -123,10 +133,8 @@ emqx_bridge_api {
 
     desc_api8 {
                    desc {
-                         en: """Stop/Restart bridges on a specific node.
- NOTE: It's not allowed to disable/enable bridges on a single node."""
-                         zh: """在某个节点上停止/重新启动 Bridge。
-NOTE:不允许在单节点上启用/禁用 Bridge"""
+                         en: """Stop/Restart bridges on a specific node."""
+                         zh: """在某个节点上停止/重新启动 Bridge。"""
                         }
                    label: {
                            en: "Node Bridge Operate"
@@ -150,4 +158,27 @@ ID 的格式必须为 ’{type}:{name}”
                         zh: "测试桥接创建"
                         }
               }
+
+    desc_bridge_metrics {
+                   desc {
+                         en: """Get bridge metrics by Id"""
+                         zh: """通过 Id 来获取桥接的指标信息"""
+                        }
+                   label: {
+                           en: "Get Bridge Metrics"
+                           zh: "获取桥接的指标"
+                          }
+                  }
+
+    desc_enable_bridge {
+                   desc {
+                         en: """Enable or Disable bridges on all nodes in the cluster."""
+                         zh: """启用或禁用所有节点上的桥接"""
+                        }
+                   label: {
+                           en: "Cluster Bridge Enable"
+                           zh: "是否启用集群内的桥接"
+                          }
+                  }
+
 }

+ 122 - 42
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -36,9 +36,11 @@
 -export([
     '/bridges'/2,
     '/bridges/:id'/2,
-    '/bridges/:id/operation/:operation'/2,
-    '/nodes/:node/bridges/:id/operation/:operation'/2,
-    '/bridges/:id/reset_metrics'/2,
+    '/bridges/:id/enable/:enable'/2,
+    '/bridges/:id/:operation'/2,
+    '/nodes/:node/bridges/:id/:operation'/2,
+    '/bridges/:id/metrics'/2,
+    '/bridges/:id/metrics/reset'/2,
     '/bridges_probe'/2
 ]).
 
@@ -67,9 +69,11 @@ paths() ->
     [
         "/bridges",
         "/bridges/:id",
-        "/bridges/:id/operation/:operation",
-        "/nodes/:node/bridges/:id/operation/:operation",
-        "/bridges/:id/reset_metrics",
+        "/bridges/:id/enable/:enable",
+        "/bridges/:id/:operation",
+        "/nodes/:node/bridges/:id/:operation",
+        "/bridges/:id/metrics",
+        "/bridges/:id/metrics/reset",
         "/bridges_probe"
     ].
 
@@ -89,7 +93,7 @@ get_response_body_schema() ->
 param_path_operation_cluster() ->
     {operation,
         mk(
-            enum([enable, disable, stop, restart]),
+            enum([stop, restart]),
             #{
                 in => path,
                 required => true,
@@ -105,7 +109,7 @@ param_path_operation_on_node() ->
             #{
                 in => path,
                 required => true,
-                example => <<"start">>,
+                example => <<"stop">>,
                 desc => ?DESC("desc_param_path_operation_on_node")
             }
         )}.
@@ -134,19 +138,34 @@ param_path_id() ->
             }
         )}.
 
-bridge_info_array_example(Method) ->
-    [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))].
+param_path_enable() ->
+    {enable,
+        mk(
+            boolean(),
+            #{
+                in => path,
+                required => true,
+                desc => ?DESC("desc_param_path_enable"),
+                example => true
+            }
+        )}.
+
+bridge_info_array_example(Method, WithMetrics) ->
+    [Config || #{value := Config} <- maps:values(bridge_info_examples(Method, WithMetrics))].
 
 bridge_info_examples(Method) ->
+    bridge_info_examples(Method, false).
+
+bridge_info_examples(Method, WithMetrics) ->
     maps:merge(
         #{
             <<"webhook_example">> => #{
                 summary => <<"WebHook">>,
-                value => info_example(webhook, Method)
+                value => info_example(webhook, Method, WithMetrics)
             },
             <<"mqtt_example">> => #{
                 summary => <<"MQTT Bridge">>,
-                value => info_example(mqtt, Method)
+                value => info_example(mqtt, Method, WithMetrics)
             }
         },
         ee_bridge_examples(Method)
@@ -159,24 +178,24 @@ ee_bridge_examples(Method) ->
         _:_ -> #{}
     end.
 
-info_example(Type, Method) ->
+info_example(Type, Method, WithMetrics) ->
     maps:merge(
         info_example_basic(Type),
-        method_example(Type, Method)
+        method_example(Type, Method, WithMetrics)
     ).
 
-method_example(Type, Method) when Method == get; Method == post ->
+method_example(Type, Method, WithMetrics) when Method == get; Method == post ->
     SType = atom_to_list(Type),
     SName = SType ++ "_example",
     TypeNameExam = #{
         type => bin(SType),
         name => bin(SName)
     },
-    maybe_with_metrics_example(TypeNameExam, Method);
-method_example(_Type, put) ->
+    maybe_with_metrics_example(TypeNameExam, Method, WithMetrics);
+method_example(_Type, put, _WithMetrics) ->
     #{}.
 
-maybe_with_metrics_example(TypeNameExam, get) ->
+maybe_with_metrics_example(TypeNameExam, get, true) ->
     TypeNameExam#{
         metrics => ?EMPTY_METRICS,
         node_metrics => [
@@ -186,7 +205,7 @@ maybe_with_metrics_example(TypeNameExam, get) ->
             }
         ]
     };
-maybe_with_metrics_example(TypeNameExam, _) ->
+maybe_with_metrics_example(TypeNameExam, _, _) ->
     TypeNameExam.
 
 info_example_basic(webhook) ->
@@ -276,7 +295,7 @@ schema("/bridges") ->
             responses => #{
                 200 => emqx_dashboard_swagger:schema_with_example(
                     array(emqx_bridge_schema:get_response()),
-                    bridge_info_array_example(get)
+                    bridge_info_array_example(get, true)
                 )
             }
         },
@@ -336,9 +355,23 @@ schema("/bridges/:id") ->
             }
         }
     };
-schema("/bridges/:id/reset_metrics") ->
+schema("/bridges/:id/metrics") ->
     #{
-        'operationId' => '/bridges/:id/reset_metrics',
+        'operationId' => '/bridges/:id/metrics',
+        get => #{
+            tags => [<<"bridges">>],
+            summary => <<"Get Bridge Metrics">>,
+            description => ?DESC("desc_bridge_metrics"),
+            parameters => [param_path_id()],
+            responses => #{
+                200 => emqx_bridge_schema:metrics_fields(),
+                404 => error_schema('NOT_FOUND', "Bridge not found")
+            }
+        }
+    };
+schema("/bridges/:id/metrics/reset") ->
+    #{
+        'operationId' => '/bridges/:id/metrics/reset',
         put => #{
             tags => [<<"bridges">>],
             summary => <<"Reset Bridge Metrics">>,
@@ -350,12 +383,29 @@ schema("/bridges/:id/reset_metrics") ->
             }
         }
     };
-schema("/bridges/:id/operation/:operation") ->
+schema("/bridges/:id/enable/:enable") ->
     #{
-        'operationId' => '/bridges/:id/operation/:operation',
+        'operationId' => '/bridges/:id/enable/:enable',
+        put =>
+            #{
+                tags => [<<"bridges">>],
+                summary => <<"Enable or Disable Bridge">>,
+                desc => ?DESC("desc_enable_bridge"),
+                parameters => [param_path_id(), param_path_enable()],
+                responses =>
+                    #{
+                        204 => <<"Success">>,
+                        400 => error_schema('INVALID_ID', "Bad bridge ID"),
+                        503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
+                    }
+            }
+    };
+schema("/bridges/:id/:operation") ->
+    #{
+        'operationId' => '/bridges/:id/:operation',
         post => #{
             tags => [<<"bridges">>],
-            summary => <<"Enable/Disable/Stop/Restart Bridge">>,
+            summary => <<"Stop or Restart Bridge">>,
             description => ?DESC("desc_api7"),
             parameters => [
                 param_path_id(),
@@ -368,9 +418,9 @@ schema("/bridges/:id/operation/:operation") ->
             }
         }
     };
-schema("/nodes/:node/bridges/:id/operation/:operation") ->
+schema("/nodes/:node/bridges/:id/:operation") ->
     #{
-        'operationId' => '/nodes/:node/bridges/:id/operation/:operation',
+        'operationId' => '/nodes/:node/bridges/:id/:operation',
         post => #{
             tags => [<<"bridges">>],
             summary => <<"Stop/Restart Bridge">>,
@@ -474,7 +524,10 @@ schema("/bridges_probe") ->
         end
     ).
 
-'/bridges/:id/reset_metrics'(put, #{bindings := #{id := Id}}) ->
+'/bridges/:id/metrics'(get, #{bindings := #{id := Id}}) ->
+    ?TRY_PARSE_ID(Id, lookup_from_all_nodes_metrics(BridgeType, BridgeName, 200)).
+
+'/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
     ?TRY_PARSE_ID(
         Id,
         case
@@ -502,10 +555,18 @@ schema("/bridges_probe") ->
     end.
 
 lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
+    FormatFun = fun format_bridge_info_without_metrics/1,
+    do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).
+
+lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) ->
+    FormatFun = fun format_bridge_metrics/1,
+    do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).
+
+do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
     Nodes = mria_mnesia:running_nodes(),
     case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
         {ok, [{ok, _} | _] = Results} ->
-            {SuccCode, format_bridge_info([R || {ok, R} <- Results])};
+            {SuccCode, FormatFun([R || {ok, R} <- Results])};
         {ok, [{error, not_found} | _]} ->
             {404, error_msg('NOT_FOUND', <<"not_found">>)};
         {error, ErrL} ->
@@ -518,19 +579,16 @@ lookup_from_local_node(BridgeType, BridgeName) ->
         Error -> Error
     end.
 
-'/bridges/:id/operation/:operation'(post, #{
-    bindings :=
-        #{id := Id, operation := Op}
-}) ->
+'/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
     ?TRY_PARSE_ID(
         Id,
-        case operation_func(Op) of
+        case enable_func(Enable) of
             invalid ->
                 {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
-            OperFunc when OperFunc == enable; OperFunc == disable ->
+            OperFunc ->
                 case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of
                     {ok, _} ->
-                        {200};
+                        {204};
                     {error, {pre_config_update, _, bridge_not_found}} ->
                         {404, error_msg('NOT_FOUND', <<"bridge not found">>)};
                     {error, {_, _, timeout}} ->
@@ -539,14 +597,26 @@ lookup_from_local_node(BridgeType, BridgeName) ->
                         {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
                     {error, Reason} ->
                         {500, error_msg('INTERNAL_ERROR', Reason)}
-                end;
+                end
+        end
+    ).
+
+'/bridges/:id/:operation'(post, #{
+    bindings :=
+        #{id := Id, operation := Op}
+}) ->
+    ?TRY_PARSE_ID(
+        Id,
+        case operation_func(Op) of
+            invalid ->
+                {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
             OperFunc ->
                 Nodes = mria_mnesia:running_nodes(),
                 operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName)
         end
     ).
 
-'/nodes/:node/bridges/:id/operation/:operation'(post, #{
+'/nodes/:node/bridges/:id/:operation'(post, #{
     bindings :=
         #{id := Id, operation := Op, node := Node}
 }) ->
@@ -576,10 +646,12 @@ node_operation_func(_) -> invalid.
 
 operation_func(<<"stop">>) -> stop;
 operation_func(<<"restart">>) -> restart;
-operation_func(<<"enable">>) -> enable;
-operation_func(<<"disable">>) -> disable;
 operation_func(_) -> invalid.
 
+enable_func(<<"true">>) -> enable;
+enable_func(<<"false">>) -> disable;
+enable_func(_) -> invalid.
+
 operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
     RpcFunc =
         case OperFunc of
@@ -605,7 +677,7 @@ zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
     lists:foldl(
         fun(#{type := Type, name := Name}, Acc) ->
             Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
-            [format_bridge_info(Bridges) | Acc]
+            [format_bridge_info_with_metrics(Bridges) | Acc]
         end,
         [],
         BridgesFirstNode
@@ -639,7 +711,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
         BridgesAllNodes
     ).
 
-format_bridge_info([FirstBridge | _] = Bridges) ->
+format_bridge_info_with_metrics([FirstBridge | _] = Bridges) ->
     Res = maps:remove(node, FirstBridge),
     NodeStatus = collect_status(Bridges),
     NodeMetrics = collect_metrics(Bridges),
@@ -650,6 +722,14 @@ format_bridge_info([FirstBridge | _] = Bridges) ->
         node_metrics => NodeMetrics
     }).
 
+format_bridge_info_without_metrics(Bridges) ->
+    Res = format_bridge_info_with_metrics(Bridges),
+    maps:without([metrics, node_metrics], Res).
+
+format_bridge_metrics(Bridges) ->
+    Res = format_bridge_info_with_metrics(Bridges),
+    maps:with([metrics, node_metrics], Res).
+
 collect_status(Bridges) ->
     [maps:with([node, status], B) || B <- Bridges].
 

+ 1 - 1
apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl

@@ -51,7 +51,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("config").
+    emqx_bridge_schema:status_fields() ++ fields("config").
 
 desc("config") ->
     ?DESC("config");

+ 13 - 8
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -30,7 +30,8 @@
 
 -export([
     common_bridge_fields/0,
-    metrics_status_fields/0
+    status_fields/0,
+    metrics_fields/0
 ]).
 
 %%======================================================================================
@@ -83,14 +84,8 @@ common_bridge_fields() ->
             )}
     ].
 
-metrics_status_fields() ->
+status_fields() ->
     [
-        {"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("desc_metrics")})},
-        {"node_metrics",
-            mk(
-                hoconsc:array(ref(?MODULE, "node_metrics")),
-                #{desc => ?DESC("desc_node_metrics")}
-            )},
         {"status", mk(status(), #{desc => ?DESC("desc_status")})},
         {"node_status",
             mk(
@@ -99,6 +94,16 @@ metrics_status_fields() ->
             )}
     ].
 
+metrics_fields() ->
+    [
+        {"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("desc_metrics")})},
+        {"node_metrics",
+            mk(
+                hoconsc:array(ref(?MODULE, "node_metrics")),
+                #{desc => ?DESC("desc_node_metrics")}
+            )}
+    ].
+
 %%======================================================================================
 %% For config files
 

+ 1 - 1
apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl

@@ -38,7 +38,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post");
+    emqx_bridge_schema:status_fields() ++ fields("post");
 fields("creation_opts") ->
     lists:filter(
         fun({K, _V}) ->

+ 96 - 20
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -195,8 +195,6 @@ t_http_crud_apis(Config) ->
         <<"enable">> := true,
         <<"status">> := _,
         <<"node_status">> := [_ | _],
-        <<"metrics">> := _,
-        <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
 
@@ -233,8 +231,6 @@ t_http_crud_apis(Config) ->
             <<"enable">> := true,
             <<"status">> := _,
             <<"node_status">> := [_ | _],
-            <<"metrics">> := _,
-            <<"node_metrics">> := [_ | _],
             <<"url">> := URL2
         },
         jsx:decode(Bridge2)
@@ -267,8 +263,6 @@ t_http_crud_apis(Config) ->
             <<"enable">> := true,
             <<"status">> := _,
             <<"node_status">> := [_ | _],
-            <<"metrics">> := _,
-            <<"node_metrics">> := [_ | _],
             <<"url">> := URL2
         },
         jsx:decode(Bridge3Str)
@@ -464,8 +458,6 @@ do_start_stop_bridges(Type, Config) ->
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
-        <<"metrics">> := _,
-        <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
     BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
@@ -510,25 +502,23 @@ t_enable_disable_bridges(Config) ->
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
-        <<"metrics">> := _,
-        <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
     BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
     %% disable it
-    {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
+    {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
     {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
     %% enable again
-    {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
+    {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
     {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
     %% enable an already started bridge
-    {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
+    {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
     {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
     %% disable it again
-    {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
+    {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
 
     {ok, 403, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>),
     ?assertEqual(
@@ -537,7 +527,7 @@ t_enable_disable_bridges(Config) ->
     ),
 
     %% enable a stopped bridge
-    {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
+    {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
     {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)),
     %% delete the bridge
@@ -563,12 +553,10 @@ t_reset_bridges(Config) ->
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
-        <<"metrics">> := _,
-        <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
     BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
-    {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []),
+    {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
 
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
@@ -667,10 +655,98 @@ t_bridges_probe(Config) ->
 request(Method, Url, Body) ->
     request(<<"bridge_admin">>, Method, Url, Body).
 
+t_metrics(Config) ->
+    Port = ?config(port, Config),
+    %% assert we there's no bridges at first
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% then we add a webhook bridge, using POST
+    %% POST /bridges/ will create a bridge
+    URL1 = ?URL(Port, "path1"),
+    Name = ?BRIDGE_NAME,
+    {ok, 201, Bridge} = request(
+        post,
+        uri(["bridges"]),
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
+    ),
+
+    %ct:pal("---bridge: ~p", [Bridge]),
+    #{
+        <<"type">> := ?BRIDGE_TYPE,
+        <<"name">> := Name,
+        <<"enable">> := true,
+        <<"status">> := _,
+        <<"node_status">> := [_ | _],
+        <<"url">> := URL1
+    } = jsx:decode(Bridge),
+
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
+
+    %% check for empty bridge metrics
+    {ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
+    ?assertMatch(
+        #{
+            <<"metrics">> := #{<<"success">> := 0},
+            <<"node_metrics">> := [_ | _]
+        },
+        jsx:decode(Bridge1Str)
+    ),
+
+    %% check that the bridge doesn't contain metrics anymore
+    {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []),
+    Decoded = jsx:decode(Bridge2Str),
+    ?assertNot(maps:is_key(<<"metrics">>, Decoded)),
+    ?assertNot(maps:is_key(<<"node_metrics">>, Decoded)),
+
+    %% send an message to emqx and the message should be forwarded to the HTTP server
+    Body = <<"my msg">>,
+    emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
+    ?assert(
+        receive
+            {http_server, received, #{
+                method := <<"POST">>,
+                path := <<"/path1">>,
+                body := Body
+            }} ->
+                true;
+            Msg ->
+                ct:pal("error: http got unexpected request: ~p", [Msg]),
+                false
+        after 100 ->
+            false
+        end
+    ),
+
+    %% check for non-empty bridge metrics
+    {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
+    ?assertMatch(
+        #{
+            <<"metrics">> := #{<<"success">> := 1},
+            <<"node_metrics">> := [_ | _]
+        },
+        jsx:decode(Bridge3Str)
+    ),
+
+    %% check for non-empty metrics when listing all bridges
+    {ok, 200, BridgesStr} = request(get, uri(["bridges"]), []),
+    ?assertMatch(
+        [
+            #{
+                <<"metrics">> := #{<<"success">> := 1},
+                <<"node_metrics">> := [_ | _]
+            }
+        ],
+        jsx:decode(BridgesStr)
+    ),
+    ok.
+
 operation_path(node, Oper, BridgeID) ->
-    uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]);
+    uri(["nodes", node(), "bridges", BridgeID, Oper]);
 operation_path(cluster, Oper, BridgeID) ->
-    uri(["bridges", BridgeID, "operation", Oper]).
+    uri(["bridges", BridgeID, Oper]).
+
+enable_path(Enable, BridgeID) ->
+    uri(["bridges", BridgeID, "enable", Enable]).
 
 str(S) when is_list(S) -> S;
 str(S) when is_binary(S) -> binary_to_list(S).

+ 21 - 17
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -187,7 +187,7 @@ t_mqtt_conn_bridge_ingress(_) ->
     ),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []),
+    {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []),
     ?assertMatch(
         #{
             <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
@@ -200,7 +200,7 @@ t_mqtt_conn_bridge_ingress(_) ->
                     }
                 ]
         },
-        jsx:decode(BridgeStr)
+        jsx:decode(BridgeMetricsStr)
     ),
 
     %% delete the bridge
@@ -255,7 +255,7 @@ t_mqtt_conn_bridge_egress(_) ->
     ),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
     ?assertMatch(
         #{
             <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
@@ -268,7 +268,7 @@ t_mqtt_conn_bridge_egress(_) ->
                     }
                 ]
         },
-        jsx:decode(BridgeStr)
+        jsx:decode(BridgeMetricsStr)
     ),
 
     %% delete the bridge
@@ -354,7 +354,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
     Payload = <<"hello">>,
     emqx:subscribe(RemoteTopic),
 
-    {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    {ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
     #{
         <<"metrics">> := #{
             <<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0
@@ -371,7 +371,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
                         }
                 }
             ]
-    } = jsx:decode(BridgeStr1),
+    } = jsx:decode(BridgeMetricsStr1),
     timer:sleep(100),
     %% PUBLISH a message to the 'local' broker, as we have only one broker,
     %% the remote broker is also the local one.
@@ -393,7 +393,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
 
     %% verify the metrics of the bridge
     timer:sleep(1000),
-    {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    {ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
     #{
         <<"metrics">> := #{
             <<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0
@@ -410,7 +410,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
                         }
                 }
             ]
-    } = jsx:decode(BridgeStr2),
+    } = jsx:decode(BridgeMetricsStr2),
     ?assertEqual(CntMatched2, CntMatched1 + 1),
     ?assertEqual(CntSuccess2, CntSuccess1 + 1),
     ?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1),
@@ -513,7 +513,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
     ),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []),
+    {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []),
     ?assertMatch(
         #{
             <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
@@ -526,7 +526,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
                     }
                 ]
         },
-        jsx:decode(BridgeStr)
+        jsx:decode(BridgeMetricsStr)
     ),
 
     {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
@@ -627,7 +627,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
     ),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
     ?assertMatch(
         #{
             <<"metrics">> := #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0},
@@ -641,7 +641,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
                     }
                 ]
         },
-        jsx:decode(BridgeStr)
+        jsx:decode(BridgeMetricsStr)
     ),
 
     {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
@@ -693,7 +693,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     assert_mqtt_msg_received(RemoteTopic, Payload0),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
     ?assertMatch(
         #{
             <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
@@ -706,7 +706,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
                     }
                 ]
         },
-        jsx:decode(BridgeStr)
+        jsx:decode(BridgeMetricsStr)
     ),
 
     %% stop the listener 1883 to make the bridge disconnected
@@ -740,7 +740,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
 
     %% verify the metrics of the bridge, the message should be queued
     {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    {ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
     Decoded1 = jsx:decode(BridgeStr1),
+    DecodedMetrics1 = jsx:decode(BridgeMetricsStr1),
     ?assertMatch(
         Status when (Status == <<"connected">> orelse Status == <<"connecting">>),
         maps:get(<<"status">>, Decoded1)
@@ -753,7 +755,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
             <<"failed">> := 0,
             <<"queuing">> := 2
         } when Matched >= 3,
-        maps:get(<<"metrics">>, Decoded1)
+        maps:get(<<"metrics">>, DecodedMetrics1)
     ),
 
     %% start the listener 1883 to make the bridge reconnected
@@ -761,10 +763,12 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     timer:sleep(1500),
     %% verify the metrics of the bridge, the 2 queued messages should have been sent
     {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    {ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
+    Decoded2 = jsx:decode(BridgeStr2),
+    ?assertEqual(<<"connected">>, maps:get(<<"status">>, Decoded2)),
     %% matched >= 3 because of possible retries.
     ?assertMatch(
         #{
-            <<"status">> := <<"connected">>,
             <<"metrics">> := #{
                 <<"matched">> := Matched,
                 <<"success">> := 3,
@@ -773,7 +777,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
                 <<"retried">> := _
             }
         } when Matched >= 3,
-        jsx:decode(BridgeStr2)
+        jsx:decode(BridgeMetricsStr2)
     ),
     %% also verify the 2 messages have been sent to the remote broker
     assert_mqtt_msg_received(RemoteTopic, Payload1),

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 5 - 0
changes/v5.0.15/feat-9736.en.md

@@ -0,0 +1,5 @@
+Refactor of /bridges API to make it more consistent with other APIs:
+- bridge enable/disable is now done via the endpoint `/bridges/{id}/enable/[true,false]`
+- `/bridges/{id}/operation/{operation}` endpoints are now `/bridges/{id}/{operation}`
+- metrics are moved out from the GET `/bridges/{id}` response and can now be fetched via `/bridges/{id}/metrics`
+- the `bridges/{id}/reset_metrics` endpoint is now `/bridges/{id}/metrics/reset`

+ 5 - 0
changes/v5.0.15/feat-9736.zh.md

@@ -0,0 +1,5 @@
+重构部分 /bridges 的API 使得其和其他 API 能够更加一致:
+- 桥接的启用和禁用现在是通过 `/bridges/{id}/enable/[true,false]` API 来实现的
+- 使用 `/bridges/{id}/{operation}` 替换了旧的 `/bridges/{id}/operation/{operation}` API
+- 指标数据从 `/bridges/{id}` 的响应消息中移除,现在可以使用新的 API  `/bridges/{id}/metrics` 进行访问
+- 使用  `/bridges/{id}/metrics/reset` 替换了旧的 `bridges/{id}/reset_metrics` API

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl

@@ -124,7 +124,7 @@ fields(bridge_config) ->
             )}
     ];
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post");
+    emqx_bridge_schema:status_fields() ++ fields("post");
 fields("post") ->
     [type_field(), name_field() | fields("config")];
 fields("put") ->

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl

@@ -67,7 +67,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post").
+    emqx_bridge_schema:status_fields() ++ fields("post").
 
 field(connector) ->
     mk(

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl

@@ -139,7 +139,7 @@ method_fileds(get, ConnectorType) ->
     influxdb_bridge_common_fields() ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType) ++
-        emqx_bridge_schema:metrics_status_fields();
+        emqx_bridge_schema:status_fields();
 method_fileds(put, ConnectorType) ->
     influxdb_bridge_common_fields() ++
         connector_fields(ConnectorType).

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -67,7 +67,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post");
+    emqx_bridge_schema:status_fields() ++ fields("post");
 fields("config") ->
     [
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},

+ 3 - 3
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl

@@ -59,15 +59,15 @@ fields("put_sharded") ->
 fields("put_single") ->
     fields(mongodb_single);
 fields("get_rs") ->
-    emqx_bridge_schema:metrics_status_fields() ++
+    emqx_bridge_schema:status_fields() ++
         fields(mongodb_rs) ++
         type_and_name_fields(mongodb_rs);
 fields("get_sharded") ->
-    emqx_bridge_schema:metrics_status_fields() ++
+    emqx_bridge_schema:status_fields() ++
         fields(mongodb_sharded) ++
         type_and_name_fields(mongodb_sharded);
 fields("get_single") ->
-    emqx_bridge_schema:metrics_status_fields() ++
+    emqx_bridge_schema:status_fields() ++
         fields(mongodb_single) ++
         type_and_name_fields(mongodb_single);
 fields("creation_opts") ->

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl

@@ -104,7 +104,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post").
+    emqx_bridge_schema:status_fields() ++ fields("post").
 
 desc("config") ->
     ?DESC("desc_config");

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl

@@ -106,7 +106,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post").
+    emqx_bridge_schema:status_fields() ++ fields("post").
 
 fields("post", Type) ->
     [type_field(Type), name_field() | fields("config")].

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl

@@ -126,7 +126,7 @@ method_fileds(get, ConnectorType) ->
     redis_bridge_common_fields() ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType) ++
-        emqx_bridge_schema:metrics_status_fields();
+        emqx_bridge_schema:status_fields();
 method_fileds(put, ConnectorType) ->
     redis_bridge_common_fields() ++
         connector_fields(ConnectorType).

+ 8 - 7
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -220,9 +220,10 @@ kafka_bridge_rest_api_helper(Config) ->
     BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName,
     BridgesParts = ["bridges"],
     BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"],
-    OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end,
-    BridgesPartsOpDisable = OpUrlFun("disable"),
-    BridgesPartsOpEnable = OpUrlFun("enable"),
+    OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, OpName] end,
+    EnableFun = fun(Enable) -> ["bridges", BridgeIdUrlEnc, "enable", Enable] end,
+    BridgesPartsOpDisable = EnableFun("false"),
+    BridgesPartsOpEnable = EnableFun("true"),
     BridgesPartsOpRestart = OpUrlFun("restart"),
     BridgesPartsOpStop = OpUrlFun("stop"),
     %% List bridges
@@ -321,10 +322,10 @@ kafka_bridge_rest_api_helper(Config) ->
     ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)),
     ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)),
     %% Perform operations
-    {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
-    {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
-    {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})),
-    {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})),
+    {ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})),
+    {ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})),
+    {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})),
+    {ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})),
     {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
     {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
     {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})),