Просмотр исходного кода

Merge pull request #12033 from thalesmg/action-metrics-api-r54-20231127

feat: add `/actions/:id/metrics/`, `/actions/:id/metrics/reset` APIs
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
e0f873f032

+ 184 - 2
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -21,6 +21,7 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_utils/include/emqx_utils_api.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
 
 -import(hoconsc, [mk/2, array/1, enum/1]).
 -import(emqx_utils, [redact/1]).
@@ -37,6 +38,8 @@
 -export([
     '/actions'/2,
     '/actions/:id'/2,
+    '/actions/:id/metrics'/2,
+    '/actions/:id/metrics/reset'/2,
     '/actions/:id/enable/:enable'/2,
     '/actions/:id/:operation'/2,
     '/nodes/:node/actions/:id/:operation'/2,
@@ -44,8 +47,8 @@
     '/action_types'/2
 ]).
 
-%% BpAPI
--export([lookup_from_local_node/2]).
+%% BpAPI / RPC Targets
+-export([lookup_from_local_node/2, get_metrics_from_local_node/2]).
 
 -define(BRIDGE_NOT_FOUND(BRIDGE_TYPE, BRIDGE_NAME),
     ?NOT_FOUND(
@@ -80,6 +83,10 @@ paths() ->
         "/actions/:id/enable/:enable",
         "/actions/:id/:operation",
         "/nodes/:node/actions/:id/:operation",
+        %% Caveat: metrics paths must come *after* `/:operation', otherwise minirest will
+        %% try to match the latter first, trying to interpret `metrics' as an operation...
+        "/actions/:id/metrics",
+        "/actions/:id/metrics/reset",
         "/actions_probe",
         "/action_types"
     ].
@@ -247,6 +254,34 @@ schema("/actions/:id") ->
             }
         }
     };
+schema("/actions/:id/metrics") ->
+    #{
+        'operationId' => '/actions/:id/metrics',
+        get => #{
+            tags => [<<"actions">>],
+            summary => <<"Get action metrics">>,
+            description => ?DESC("desc_bridge_metrics"),
+            parameters => [param_path_id()],
+            responses => #{
+                200 => emqx_bridge_schema:metrics_fields(),
+                404 => error_schema('NOT_FOUND', "Action not found")
+            }
+        }
+    };
+schema("/actions/:id/metrics/reset") ->
+    #{
+        'operationId' => '/actions/:id/metrics/reset',
+        put => #{
+            tags => [<<"actions">>],
+            summary => <<"Reset action metrics">>,
+            description => ?DESC("desc_api6"),
+            parameters => [param_path_id()],
+            responses => #{
+                204 => <<"Reset success">>,
+                404 => error_schema('NOT_FOUND', "Action not found")
+            }
+        }
+    };
 schema("/actions/:id/enable/:enable") ->
     #{
         'operationId' => '/actions/:id/enable/:enable',
@@ -429,6 +464,19 @@ schema("/action_types") ->
         end
     ).
 
+'/actions/:id/metrics'(get, #{bindings := #{id := Id}}) ->
+    ?TRY_PARSE_ID(Id, get_metrics_from_all_nodes(BridgeType, BridgeName)).
+
+'/actions/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
+    ?TRY_PARSE_ID(
+        Id,
+        begin
+            ActionType = emqx_bridge_v2:bridge_v2_type_to_connector_type(BridgeType),
+            ok = emqx_bridge_v2:reset_metrics(ActionType, BridgeName),
+            ?NO_CONTENT
+        end
+    ).
+
 '/actions/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
     ?TRY_PARSE_ID(
         Id,
@@ -570,6 +618,18 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
             ?INTERNAL_ERROR(Reason)
     end.
 
+get_metrics_from_all_nodes(ActionType, ActionName) ->
+    Nodes = emqx:running_nodes(),
+    Result = maybe_unwrap(
+        emqx_bridge_proto_v5:v2_get_metrics_from_all_nodes(Nodes, ActionType, ActionName)
+    ),
+    case Result of
+        Metrics when is_list(Metrics) ->
+            {200, format_bridge_metrics(lists:zip(Nodes, Metrics))};
+        {error, Reason} ->
+            ?INTERNAL_ERROR(Reason)
+    end.
+
 operation_func(all, start) -> v2_start_bridge_to_all_nodes;
 operation_func(_Node, start) -> v2_start_bridge_to_node.
 
@@ -720,12 +780,17 @@ aggregate_status(AllStatus) ->
         false -> inconsistent
     end.
 
+%% RPC Target
 lookup_from_local_node(BridgeType, BridgeName) ->
     case emqx_bridge_v2:lookup(BridgeType, BridgeName) of
         {ok, Res} -> {ok, format_resource(Res, node())};
         Error -> Error
     end.
 
+%% RPC Target
+get_metrics_from_local_node(ActionType, ActionName) ->
+    format_metrics(emqx_bridge_v2:get_metrics(ActionType, ActionName)).
+
 %% resource
 format_resource(
     #{
@@ -751,6 +816,123 @@ format_resource(
         )
     ).
 
+format_metrics(#{
+    counters := #{
+        'dropped' := Dropped,
+        'dropped.other' := DroppedOther,
+        'dropped.expired' := DroppedExpired,
+        'dropped.queue_full' := DroppedQueueFull,
+        'dropped.resource_not_found' := DroppedResourceNotFound,
+        'dropped.resource_stopped' := DroppedResourceStopped,
+        'matched' := Matched,
+        'retried' := Retried,
+        'late_reply' := LateReply,
+        'failed' := SentFailed,
+        'success' := SentSucc,
+        'received' := Rcvd
+    },
+    gauges := Gauges,
+    rate := #{
+        matched := #{current := Rate, last5m := Rate5m, max := RateMax}
+    }
+}) ->
+    Queued = maps:get('queuing', Gauges, 0),
+    SentInflight = maps:get('inflight', Gauges, 0),
+    ?METRICS(
+        Dropped,
+        DroppedOther,
+        DroppedExpired,
+        DroppedQueueFull,
+        DroppedResourceNotFound,
+        DroppedResourceStopped,
+        Matched,
+        Queued,
+        Retried,
+        LateReply,
+        SentFailed,
+        SentInflight,
+        SentSucc,
+        Rate,
+        Rate5m,
+        RateMax,
+        Rcvd
+    );
+format_metrics(_Metrics) ->
+    %% Empty metrics: can happen when a node joins another and a
+    %% bridge is not yet replicated to it, so the counters map is
+    %% empty.
+    empty_metrics().
+
+empty_metrics() ->
+    ?METRICS(
+        _Dropped = 0,
+        _DroppedOther = 0,
+        _DroppedExpired = 0,
+        _DroppedQueueFull = 0,
+        _DroppedResourceNotFound = 0,
+        _DroppedResourceStopped = 0,
+        _Matched = 0,
+        _Queued = 0,
+        _Retried = 0,
+        _LateReply = 0,
+        _SentFailed = 0,
+        _SentInflight = 0,
+        _SentSucc = 0,
+        _Rate = 0,
+        _Rate5m = 0,
+        _RateMax = 0,
+        _Rcvd = 0
+    ).
+
+format_bridge_metrics(Bridges) ->
+    NodeMetrics = lists:filtermap(
+        fun
+            ({Node, Metrics}) when is_map(Metrics) ->
+                {true, #{node => Node, metrics => Metrics}};
+            ({Node, _}) ->
+                {true, #{node => Node, metrics => empty_metrics()}}
+        end,
+        Bridges
+    ),
+    #{
+        metrics => aggregate_metrics(NodeMetrics),
+        node_metrics => NodeMetrics
+    }.
+
+aggregate_metrics(AllMetrics) ->
+    InitMetrics = ?EMPTY_METRICS,
+    lists:foldl(fun aggregate_metrics/2, InitMetrics, AllMetrics).
+
+aggregate_metrics(
+    #{
+        metrics := ?metrics(
+            M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
+        )
+    },
+    ?metrics(
+        N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
+    )
+) ->
+    ?METRICS(
+        M1 + N1,
+        M2 + N2,
+        M3 + N3,
+        M4 + N4,
+        M5 + N5,
+        M6 + N6,
+        M7 + N7,
+        M8 + N8,
+        M9 + N9,
+        M10 + N10,
+        M11 + N11,
+        M12 + N12,
+        M13 + N13,
+        M14 + N14,
+        M15 + N15,
+        M16 + N16,
+        M17 + N17
+    ).
+
 format_bridge_status_and_error(Data) ->
     maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], Data)).
 

+ 13 - 1
apps/emqx_bridge/src/proto/emqx_bridge_proto_v5.erl

@@ -34,7 +34,8 @@
     v2_start_bridge_to_node/3,
     v2_start_bridge_to_all_nodes/3,
     v2_list_bridges_on_nodes/1,
-    v2_lookup_from_all_nodes/3
+    v2_lookup_from_all_nodes/3,
+    v2_get_metrics_from_all_nodes/3
 ]).
 
 -include_lib("emqx/include/bpapi.hrl").
@@ -156,6 +157,17 @@ v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
         ?TIMEOUT
     ).
 
+-spec v2_get_metrics_from_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall().
+v2_get_metrics_from_all_nodes(Nodes, ActionType, ActionName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_v2_api,
+        get_metrics_from_local_node,
+        [ActionType, ActionName],
+        ?TIMEOUT
+    ).
+
 -spec v2_start_bridge_to_all_nodes([node()], key(), key()) ->
     emqx_rpc:erpc_multicall().
 v2_start_bridge_to_all_nodes(Nodes, BridgeType, BridgeName) ->

+ 144 - 48
apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl

@@ -56,6 +56,7 @@
 -define(CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)).
 -define(CONNECTOR, ?CONNECTOR(?CONNECTOR_NAME)).
 
+-define(MQTT_LOCAL_TOPIC, <<"mqtt/local/topic">>).
 -define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
 -define(BRIDGE_TYPE_STR, "kafka_producer").
 -define(BRIDGE_TYPE, <<?BRIDGE_TYPE_STR>>).
@@ -93,7 +94,7 @@
         <<"required_acks">> => <<"all_isr">>,
         <<"topic">> => <<"kafka-topic">>
     },
-    <<"local_topic">> => <<"mqtt/local/topic">>,
+    <<"local_topic">> => ?MQTT_LOCAL_TOPIC,
     <<"resource_opts">> => #{
         <<"health_check_interval">> => <<"32s">>
     }
@@ -105,48 +106,6 @@
 ).
 -define(KAFKA_BRIDGE_UPDATE(Name), ?KAFKA_BRIDGE_UPDATE(Name, ?CONNECTOR_NAME)).
 
-%% -define(BRIDGE_TYPE_MQTT, <<"mqtt">>).
-%% -define(MQTT_BRIDGE(SERVER, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_MQTT)#{
-%%     <<"server">> => SERVER,
-%%     <<"username">> => <<"user1">>,
-%%     <<"password">> => <<"">>,
-%%     <<"proto_ver">> => <<"v5">>,
-%%     <<"egress">> => #{
-%%         <<"remote">> => #{
-%%             <<"topic">> => <<"emqx/${topic}">>,
-%%             <<"qos">> => <<"${qos}">>,
-%%             <<"retain">> => false
-%%         }
-%%     }
-%% }).
-%% -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)).
-
-%% -define(BRIDGE_TYPE_HTTP, <<"kafka">>).
-%% -define(HTTP_BRIDGE(URL, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_HTTP)#{
-%%     <<"url">> => URL,
-%%     <<"local_topic">> => <<"emqx_webhook/#">>,
-%%     <<"method">> => <<"post">>,
-%%     <<"body">> => <<"${payload}">>,
-%%     <<"headers">> => #{
-%%         % NOTE
-%%         % The Pascal-Case is important here.
-%%         % The reason is kinda ridiculous: `emqx_bridge_resource:create_dry_run/2` converts
-%%         % bridge config keys into atoms, and the atom 'Content-Type' exists in the ERTS
-%%         % when this happens (while the 'content-type' does not).
-%%         <<"Content-Type">> => <<"application/json">>
-%%     }
-%% }).
-%% -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)).
-
-%% -define(URL(PORT, PATH),
-%%         list_to_binary(
-%%           io_lib:format(
-%%             "http://localhost:~s/~s",
-%%             [integer_to_list(PORT), PATH]
-%%            )
-%%          )
-%%        ).
-
 -define(APPSPECS, [
     emqx_conf,
     emqx,
@@ -166,7 +125,7 @@
 all() ->
     [
         {group, single},
-        %{group, cluster_later_join},
+        {group, cluster_later_join},
         {group, cluster}
     ].
 -else.
@@ -182,7 +141,7 @@ groups() ->
         t_fix_broken_bridge_config
     ],
     ClusterLaterJoinOnlyTCs = [
-        % t_cluster_later_join_metrics
+        t_cluster_later_join_metrics
     ],
     [
         {single, [], AllTCs -- ClusterLaterJoinOnlyTCs},
@@ -202,9 +161,9 @@ end_per_suite(_Config) ->
 init_per_group(cluster = Name, Config) ->
     Nodes = [NodePrimary | _] = mk_cluster(Name, Config),
     init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
-%% init_per_group(cluster_later_join = Name, Config) ->
-%%     Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}),
-%%     init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
+init_per_group(cluster_later_join = Name, Config) ->
+    Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}),
+    init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
 init_per_group(Name, Config) ->
     WorkDir = filename:join(?config(priv_dir, Config), Name),
     Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
@@ -1041,6 +1000,143 @@ t_bad_name(Config) ->
     ),
     ok.
 
+t_metrics(Config) ->
+    {ok, 200, []} = request_json(get, uri([?ROOT]), Config),
+
+    ActionName = ?BRIDGE_NAME,
+    ?assertMatch(
+        {ok, 201, _},
+        request_json(
+            post,
+            uri([?ROOT]),
+            ?KAFKA_BRIDGE(?BRIDGE_NAME),
+            Config
+        )
+    ),
+
+    ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName),
+
+    ?assertMatch(
+        {ok, 200, #{
+            <<"metrics">> := #{<<"matched">> := 0},
+            <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 0}} | _]
+        }},
+        request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
+    ),
+
+    {ok, 200, Bridge} = request_json(get, uri([?ROOT, ActionID]), Config),
+    ?assertNot(maps:is_key(<<"metrics">>, Bridge)),
+    ?assertNot(maps:is_key(<<"node_metrics">>, Bridge)),
+
+    Body = <<"my msg">>,
+    _ = publish_message(?MQTT_LOCAL_TOPIC, Body, Config),
+
+    %% check for non-empty bridge metrics
+    ?retry(
+        _Sleep0 = 200,
+        _Retries0 = 20,
+        ?assertMatch(
+            {ok, 200, #{
+                <<"metrics">> := #{<<"matched">> := 1},
+                <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 1}} | _]
+            }},
+            request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
+        )
+    ),
+
+    %% check for absence of metrics when listing all bridges
+    {ok, 200, Bridges} = request_json(get, uri([?ROOT]), Config),
+    ?assertNotMatch(
+        [
+            #{
+                <<"metrics">> := #{},
+                <<"node_metrics">> := [_ | _]
+            }
+        ],
+        Bridges
+    ),
+    ok.
+
+t_reset_metrics(Config) ->
+    %% assert there's no bridges at first
+    {ok, 200, []} = request_json(get, uri([?ROOT]), Config),
+
+    ActionName = ?BRIDGE_NAME,
+    ?assertMatch(
+        {ok, 201, _},
+        request_json(
+            post,
+            uri([?ROOT]),
+            ?KAFKA_BRIDGE(?BRIDGE_NAME),
+            Config
+        )
+    ),
+    ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName),
+
+    Body = <<"my msg">>,
+    _ = publish_message(?MQTT_LOCAL_TOPIC, Body, Config),
+    ?retry(
+        _Sleep0 = 200,
+        _Retries0 = 20,
+        ?assertMatch(
+            {ok, 200, #{
+                <<"metrics">> := #{<<"matched">> := 1},
+                <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
+            }},
+            request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
+        )
+    ),
+
+    {ok, 204, <<>>} = request(put, uri([?ROOT, ActionID, "metrics", "reset"]), Config),
+
+    ?retry(
+        _Sleep0 = 200,
+        _Retries0 = 20,
+        ?assertMatch(
+            {ok, 200, #{
+                <<"metrics">> := #{<<"matched">> := 0},
+                <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
+            }},
+            request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
+        )
+    ),
+
+    ok.
+
+t_cluster_later_join_metrics(Config) ->
+    [PrimaryNode, OtherNode | _] = ?config(cluster_nodes, Config),
+    Name = ?BRIDGE_NAME,
+    ActionParams = ?KAFKA_BRIDGE(Name),
+    ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
+    ?check_trace(
+        begin
+            %% Create a bridge on only one of the nodes.
+            ?assertMatch({ok, 201, _}, request_json(post, uri([?ROOT]), ActionParams, Config)),
+            %% Pre-condition.
+            ?assertMatch(
+                {ok, 200, #{
+                    <<"metrics">> := #{<<"success">> := _},
+                    <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
+                }},
+                request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
+            ),
+            %% Now join the other node join with the api node.
+            ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
+            %% Check metrics; shouldn't crash even if the bridge is not
+            %% ready on the node that just joined the cluster.
+            ?assertMatch(
+                {ok, 200, #{
+                    <<"metrics">> := #{<<"success">> := _},
+                    <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _]
+                }},
+                request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %%% helpers
 listen_on_random_port() ->
     SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],