Просмотр исходного кода

feat(emqx_bridge): move metrics to own endpoint, rename reset_metrics

In order for the /bridges APIs to be consistent with other APIs, we move
out metrics from GET /bridges/{id} to its own endpoint,
/bridges/{id}/metrics. We also rename /bridges/reset_metrics to
/bridges/metrics/reset.
Erik Timan 3 лет назад
Родитель
Сommit
860e21d40f

+ 10 - 0
apps/emqx_bridge/i18n/emqx_bridge_api.conf

@@ -134,4 +134,14 @@ NOTE:不允许在单节点上启用/禁用 Bridge"""
                           }
                   }
 
+    desc_bridge_metrics {
+                   desc {
+                         en: """Get bridge metrics by Id"""
+                         zh: """"""
+                        }
+                   label: {
+                           en: "Get Bridge Metrics"
+                           zh: ""
+                          }
+                  }
 }

+ 58 - 20
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -38,7 +38,8 @@
     '/bridges/:id'/2,
     '/bridges/:id/:operation'/2,
     '/nodes/:node/bridges/:id/:operation'/2,
-    '/bridges/:id/reset_metrics'/2
+    '/bridges/:id/metrics'/2,
+    '/bridges/:id/metrics/reset'/2
 ]).
 
 -export([lookup_from_local_node/2]).
@@ -68,7 +69,8 @@ paths() ->
         "/bridges/:id",
         "/bridges/:id/:operation",
         "/nodes/:node/bridges/:id/:operation",
-        "/bridges/:id/reset_metrics"
+        "/bridges/:id/metrics",
+        "/bridges/:id/metrics/reset"
     ].
 
 error_schema(Code, Message) when is_atom(Code) ->
@@ -132,19 +134,22 @@ param_path_id() ->
             }
         )}.
 
-bridge_info_array_example(Method) ->
-    [Config || #{value := Config} <- maps:values(bridge_info_examples(Method))].
+bridge_info_array_example(Method, WithMetrics) ->
+    [Config || #{value := Config} <- maps:values(bridge_info_examples(Method, WithMetrics))].
 
 bridge_info_examples(Method) ->
+    bridge_info_examples(Method, false).
+
+bridge_info_examples(Method, WithMetrics) ->
     maps:merge(
         #{
             <<"webhook_example">> => #{
                 summary => <<"WebHook">>,
-                value => info_example(webhook, Method)
+                value => info_example(webhook, Method, WithMetrics)
             },
             <<"mqtt_example">> => #{
                 summary => <<"MQTT Bridge">>,
-                value => info_example(mqtt, Method)
+                value => info_example(mqtt, Method, WithMetrics)
             }
         },
         ee_bridge_examples(Method)
@@ -157,24 +162,24 @@ ee_bridge_examples(Method) ->
         _:_ -> #{}
     end.
 
-info_example(Type, Method) ->
+info_example(Type, Method, WithMetrics) ->
     maps:merge(
         info_example_basic(Type),
-        method_example(Type, Method)
+        method_example(Type, Method, WithMetrics)
     ).
 
-method_example(Type, Method) when Method == get; Method == post ->
+method_example(Type, Method, WithMetrics) when Method == get; Method == post ->
     SType = atom_to_list(Type),
     SName = SType ++ "_example",
     TypeNameExam = #{
         type => bin(SType),
         name => bin(SName)
     },
-    maybe_with_metrics_example(TypeNameExam, Method);
-method_example(_Type, put) ->
+    maybe_with_metrics_example(TypeNameExam, Method, WithMetrics);
+method_example(_Type, put, _WithMetrics) ->
     #{}.
 
-maybe_with_metrics_example(TypeNameExam, get) ->
+maybe_with_metrics_example(TypeNameExam, get, true) ->
     TypeNameExam#{
         metrics => ?EMPTY_METRICS,
         node_metrics => [
@@ -184,7 +189,7 @@ maybe_with_metrics_example(TypeNameExam, get) ->
             }
         ]
     };
-maybe_with_metrics_example(TypeNameExam, _) ->
+maybe_with_metrics_example(TypeNameExam, _, _) ->
     TypeNameExam.
 
 info_example_basic(webhook) ->
@@ -274,7 +279,7 @@ schema("/bridges") ->
             responses => #{
                 200 => emqx_dashboard_swagger:schema_with_example(
                     array(emqx_bridge_schema:get_response()),
-                    bridge_info_array_example(get)
+                    bridge_info_array_example(get, true)
                 )
             }
         },
@@ -334,9 +339,23 @@ schema("/bridges/:id") ->
             }
         }
     };
-schema("/bridges/:id/reset_metrics") ->
+schema("/bridges/:id/metrics") ->
     #{
-        'operationId' => '/bridges/:id/reset_metrics',
+        'operationId' => '/bridges/:id/metrics',
+        get => #{
+            tags => [<<"bridges">>],
+            summary => <<"Get Bridge Metrics">>,
+            description => ?DESC("desc_bridge_metrics"),
+            parameters => [param_path_id()],
+            responses => #{
+                200 => emqx_bridge_schema:metrics_fields(),
+                404 => error_schema('NOT_FOUND', "Bridge not found")
+            }
+        }
+    };
+schema("/bridges/:id/metrics/reset") ->
+    #{
+        'operationId' => '/bridges/:id/metrics/reset',
         put => #{
             tags => [<<"bridges">>],
             summary => <<"Reset Bridge Metrics">>,
@@ -455,7 +474,10 @@ schema("/nodes/:node/bridges/:id/:operation") ->
         end
     ).
 
-'/bridges/:id/reset_metrics'(put, #{bindings := #{id := Id}}) ->
+'/bridges/:id/metrics'(get, #{bindings := #{id := Id}}) ->
+    ?TRY_PARSE_ID(Id, lookup_from_all_nodes_metrics(BridgeType, BridgeName, 200)).
+
+'/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
     ?TRY_PARSE_ID(
         Id,
         case
@@ -469,10 +491,18 @@ schema("/nodes/:node/bridges/:id/:operation") ->
     ).
 
 lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
+    FormatFun = fun format_bridge_info_without_metrics/1,
+    do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).
+
+lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) ->
+    FormatFun = fun format_bridge_metrics/1,
+    do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).
+
+do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
     Nodes = mria_mnesia:running_nodes(),
     case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
         {ok, [{ok, _} | _] = Results} ->
-            {SuccCode, format_bridge_info([R || {ok, R} <- Results])};
+            {SuccCode, FormatFun([R || {ok, R} <- Results])};
         {ok, [{error, not_found} | _]} ->
             {404, error_msg('NOT_FOUND', <<"not_found">>)};
         {error, ErrL} ->
@@ -572,7 +602,7 @@ zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
     lists:foldl(
         fun(#{type := Type, name := Name}, Acc) ->
             Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
-            [format_bridge_info(Bridges) | Acc]
+            [format_bridge_info_with_metrics(Bridges) | Acc]
         end,
         [],
         BridgesFirstNode
@@ -606,7 +636,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
         BridgesAllNodes
     ).
 
-format_bridge_info([FirstBridge | _] = Bridges) ->
+format_bridge_info_with_metrics([FirstBridge | _] = Bridges) ->
     Res = maps:remove(node, FirstBridge),
     NodeStatus = collect_status(Bridges),
     NodeMetrics = collect_metrics(Bridges),
@@ -617,6 +647,14 @@ format_bridge_info([FirstBridge | _] = Bridges) ->
         node_metrics => NodeMetrics
     }).
 
+format_bridge_info_without_metrics(Bridges) ->
+    Res = format_bridge_info_with_metrics(Bridges),
+    maps:without([metrics, node_metrics], Res).
+
+format_bridge_metrics(Bridges) ->
+    Res = format_bridge_info_with_metrics(Bridges),
+    maps:with([metrics, node_metrics], Res).
+
 collect_status(Bridges) ->
     [maps:with([node, status], B) || B <- Bridges].
 

+ 1 - 1
apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl

@@ -51,7 +51,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("config").
+    emqx_bridge_schema:status_fields() ++ fields("config").
 
 desc("config") ->
     ?DESC("config");

+ 13 - 8
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -30,7 +30,8 @@
 
 -export([
     common_bridge_fields/0,
-    metrics_status_fields/0
+    status_fields/0,
+    metrics_fields/0
 ]).
 
 %%======================================================================================
@@ -83,14 +84,8 @@ common_bridge_fields() ->
             )}
     ].
 
-metrics_status_fields() ->
+status_fields() ->
     [
-        {"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("desc_metrics")})},
-        {"node_metrics",
-            mk(
-                hoconsc:array(ref(?MODULE, "node_metrics")),
-                #{desc => ?DESC("desc_node_metrics")}
-            )},
         {"status", mk(status(), #{desc => ?DESC("desc_status")})},
         {"node_status",
             mk(
@@ -99,6 +94,16 @@ metrics_status_fields() ->
             )}
     ].
 
+metrics_fields() ->
+    [
+        {"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("desc_metrics")})},
+        {"node_metrics",
+            mk(
+                hoconsc:array(ref(?MODULE, "node_metrics")),
+                #{desc => ?DESC("desc_node_metrics")}
+            )}
+    ].
+
 %%======================================================================================
 %% For config files
 

+ 1 - 1
apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl

@@ -38,7 +38,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post");
+    emqx_bridge_schema:status_fields() ++ fields("post");
 fields("creation_opts") ->
     lists:filter(
         fun({K, _V}) ->

+ 83 - 13
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -187,8 +187,6 @@ t_http_crud_apis(Config) ->
         <<"enable">> := true,
         <<"status">> := _,
         <<"node_status">> := [_ | _],
-        <<"metrics">> := _,
-        <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
 
@@ -225,8 +223,6 @@ t_http_crud_apis(Config) ->
             <<"enable">> := true,
             <<"status">> := _,
             <<"node_status">> := [_ | _],
-            <<"metrics">> := _,
-            <<"node_metrics">> := [_ | _],
             <<"url">> := URL2
         },
         jsx:decode(Bridge2)
@@ -259,8 +255,6 @@ t_http_crud_apis(Config) ->
             <<"enable">> := true,
             <<"status">> := _,
             <<"node_status">> := [_ | _],
-            <<"metrics">> := _,
-            <<"node_metrics">> := [_ | _],
             <<"url">> := URL2
         },
         jsx:decode(Bridge3Str)
@@ -456,8 +450,6 @@ do_start_stop_bridges(Type, Config) ->
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
-        <<"metrics">> := _,
-        <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
     BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
@@ -502,8 +494,6 @@ t_enable_disable_bridges(Config) ->
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
-        <<"metrics">> := _,
-        <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
     BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
@@ -555,12 +545,10 @@ t_reset_bridges(Config) ->
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
-        <<"metrics">> := _,
-        <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
     BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
-    {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []),
+    {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
 
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
@@ -599,6 +587,88 @@ t_with_redact_update(_Config) ->
     ?assertEqual(Password, Value),
     ok.
 
+t_metrics(Config) ->
+    Port = ?config(port, Config),
+    %% assert we there's no bridges at first
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% then we add a webhook bridge, using POST
+    %% POST /bridges/ will create a bridge
+    URL1 = ?URL(Port, "path1"),
+    Name = ?BRIDGE_NAME,
+    {ok, 201, Bridge} = request(
+        post,
+        uri(["bridges"]),
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
+    ),
+
+    %ct:pal("---bridge: ~p", [Bridge]),
+    #{
+        <<"type">> := ?BRIDGE_TYPE,
+        <<"name">> := Name,
+        <<"enable">> := true,
+        <<"status">> := _,
+        <<"node_status">> := [_ | _],
+        <<"url">> := URL1
+    } = jsx:decode(Bridge),
+
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
+
+    %% check for empty bridge metrics
+    {ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
+    ct:pal("HERE ~p", [jsx:decode(Bridge1Str)]),
+    ?assertMatch(
+        #{
+            <<"metrics">> := #{<<"success">> := 0},
+            <<"node_metrics">> := [_ | _]
+        },
+        jsx:decode(Bridge1Str)
+    ),
+
+    %% send an message to emqx and the message should be forwarded to the HTTP server
+    Body = <<"my msg">>,
+    emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
+    ?assert(
+        receive
+            {http_server, received, #{
+                method := <<"POST">>,
+                path := <<"/path1">>,
+                body := Body
+            }} ->
+                true;
+            Msg ->
+                ct:pal("error: http got unexpected request: ~p", [Msg]),
+                false
+        after 100 ->
+            false
+        end
+    ),
+
+    %% check for non-empty bridge metrics
+    {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
+    ct:pal("HERE ~p", [jsx:decode(Bridge2Str)]),
+    ?assertMatch(
+        #{
+            <<"metrics">> := #{<<"success">> := 1},
+            <<"node_metrics">> := [_ | _]
+        },
+        jsx:decode(Bridge2Str)
+    ),
+
+    %% check for non-empty metrics when listing all bridges
+    {ok, 200, BridgesStr} = request(get, uri(["bridges"]), []),
+    ct:pal("HERE ~p", [jsx:decode(BridgesStr)]),
+    ?assertMatch(
+        [
+            #{
+                <<"metrics">> := #{<<"success">> := 1},
+                <<"node_metrics">> := [_ | _]
+            }
+        ],
+        jsx:decode(BridgesStr)
+    ),
+    ok.
+
 operation_path(node, Oper, BridgeID) ->
     uri(["nodes", node(), "bridges", BridgeID, Oper]);
 operation_path(cluster, Oper, BridgeID) ->

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl

@@ -124,7 +124,7 @@ fields(bridge_config) ->
             )}
     ];
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post");
+    emqx_bridge_schema:status_fields() ++ fields("post");
 fields("post") ->
     [type_field(), name_field() | fields("config")];
 fields("put") ->

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl

@@ -67,7 +67,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post").
+    emqx_bridge_schema:status_fields() ++ fields("post").
 
 field(connector) ->
     mk(

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl

@@ -139,7 +139,7 @@ method_fileds(get, ConnectorType) ->
     influxdb_bridge_common_fields() ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType) ++
-        emqx_bridge_schema:metrics_status_fields();
+        emqx_bridge_schema:status_fields();
 method_fileds(put, ConnectorType) ->
     influxdb_bridge_common_fields() ++
         connector_fields(ConnectorType).

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -67,7 +67,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post");
+    emqx_bridge_schema:status_fields() ++ fields("post");
 fields("config") ->
     [
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},

+ 3 - 3
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl

@@ -59,15 +59,15 @@ fields("put_sharded") ->
 fields("put_single") ->
     fields(mongodb_single);
 fields("get_rs") ->
-    emqx_bridge_schema:metrics_status_fields() ++
+    emqx_bridge_schema:status_fields() ++
         fields(mongodb_rs) ++
         type_and_name_fields(mongodb_rs);
 fields("get_sharded") ->
-    emqx_bridge_schema:metrics_status_fields() ++
+    emqx_bridge_schema:status_fields() ++
         fields(mongodb_sharded) ++
         type_and_name_fields(mongodb_sharded);
 fields("get_single") ->
-    emqx_bridge_schema:metrics_status_fields() ++
+    emqx_bridge_schema:status_fields() ++
         fields(mongodb_single) ++
         type_and_name_fields(mongodb_single).
 

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl

@@ -104,7 +104,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post").
+    emqx_bridge_schema:status_fields() ++ fields("post").
 
 desc("config") ->
     ?DESC("desc_config");

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl

@@ -106,7 +106,7 @@ fields("post") ->
 fields("put") ->
     fields("config");
 fields("get") ->
-    emqx_bridge_schema:metrics_status_fields() ++ fields("post").
+    emqx_bridge_schema:status_fields() ++ fields("post").
 
 fields("post", Type) ->
     [type_field(Type), name_field() | fields("config")].

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl

@@ -126,7 +126,7 @@ method_fileds(get, ConnectorType) ->
     redis_bridge_common_fields() ++
         connector_fields(ConnectorType) ++
         type_name_fields(ConnectorType) ++
-        emqx_bridge_schema:metrics_status_fields();
+        emqx_bridge_schema:status_fields();
 method_fileds(put, ConnectorType) ->
     redis_bridge_common_fields() ++
         connector_fields(ConnectorType).