|
|
@@ -46,6 +46,7 @@
|
|
|
]).
|
|
|
|
|
|
-export([lookup_from_local_node/2]).
|
|
|
+-export([get_metrics_from_local_node/2]).
|
|
|
|
|
|
-define(BRIDGE_NOT_ENABLED,
|
|
|
?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>)
|
|
|
@@ -467,7 +468,7 @@ schema("/bridges_probe") ->
|
|
|
end;
|
|
|
'/bridges'(get, _Params) ->
|
|
|
Nodes = mria:running_nodes(),
|
|
|
- NodeReplies = emqx_bridge_proto_v3:list_bridges_on_nodes(Nodes),
|
|
|
+ NodeReplies = emqx_bridge_proto_v4:list_bridges_on_nodes(Nodes),
|
|
|
case is_ok(NodeReplies) of
|
|
|
{ok, NodeBridges} ->
|
|
|
AllBridges = [
|
|
|
@@ -524,7 +525,7 @@ schema("/bridges_probe") ->
|
|
|
).
|
|
|
|
|
|
'/bridges/:id/metrics'(get, #{bindings := #{id := Id}}) ->
|
|
|
- ?TRY_PARSE_ID(Id, lookup_from_all_nodes_metrics(BridgeType, BridgeName, 200)).
|
|
|
+ ?TRY_PARSE_ID(Id, get_metrics_from_all_nodes(BridgeType, BridgeName)).
|
|
|
|
|
|
'/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
|
|
|
?TRY_PARSE_ID(
|
|
|
@@ -564,19 +565,21 @@ maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeN
|
|
|
maybe_deobfuscate_bridge_probe(Params) ->
|
|
|
Params.
|
|
|
|
|
|
-lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
|
|
- FormatFun = fun format_bridge_info/1,
|
|
|
- do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).
|
|
|
-
|
|
|
-lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) ->
|
|
|
- FormatFun = fun format_bridge_metrics/1,
|
|
|
- do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).
|
|
|
+get_metrics_from_all_nodes(BridgeType, BridgeName) ->
|
|
|
+ Nodes = mria:running_nodes(),
|
|
|
+ Result = do_bpapi_call(all, get_metrics_from_all_nodes, [Nodes, BridgeType, BridgeName]),
|
|
|
+ case Result of
|
|
|
+ Metrics when is_list(Metrics) ->
|
|
|
+ {200, format_bridge_metrics(lists:zip(Nodes, Metrics))};
|
|
|
+ {error, Reason} ->
|
|
|
+ ?INTERNAL_ERROR(Reason)
|
|
|
+ end.
|
|
|
|
|
|
-do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
|
|
|
+lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
|
|
Nodes = mria:running_nodes(),
|
|
|
- case is_ok(emqx_bridge_proto_v3:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
|
|
|
+ case is_ok(emqx_bridge_proto_v4:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
|
|
|
{ok, [{ok, _} | _] = Results} ->
|
|
|
- {SuccCode, FormatFun([R || {ok, R} <- Results])};
|
|
|
+ {SuccCode, format_bridge_info([R || {ok, R} <- Results])};
|
|
|
{ok, [{error, not_found} | _]} ->
|
|
|
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
|
|
|
{error, Reason} ->
|
|
|
@@ -603,6 +606,9 @@ create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
|
|
|
?BAD_REQUEST(map_to_json(Reason))
|
|
|
end.
|
|
|
|
|
|
+get_metrics_from_local_node(BridgeType, BridgeName) ->
|
|
|
+ format_metrics(emqx_bridge:get_metrics(BridgeType, BridgeName)).
|
|
|
+
|
|
|
'/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
|
|
|
?TRY_PARSE_ID(
|
|
|
Id,
|
|
|
@@ -739,7 +745,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
|
|
|
).
|
|
|
|
|
|
format_bridge_info([FirstBridge | _] = Bridges) ->
|
|
|
- Res = maps:without([node, metrics], FirstBridge),
|
|
|
+ Res = maps:remove(node, FirstBridge),
|
|
|
NodeStatus = node_status(Bridges),
|
|
|
redact(Res#{
|
|
|
status => aggregate_status(NodeStatus),
|
|
|
@@ -766,7 +772,7 @@ aggregate_status(AllStatus) ->
|
|
|
end.
|
|
|
|
|
|
collect_metrics(Bridges) ->
|
|
|
- [maps:with([node, metrics], B) || B <- Bridges].
|
|
|
+ [#{node => Node, metrics => Metrics} || {Node, Metrics} <- Bridges].
|
|
|
|
|
|
aggregate_metrics(AllMetrics) ->
|
|
|
InitMetrics = ?EMPTY_METRICS,
|
|
|
@@ -800,9 +806,7 @@ aggregate_metrics(
|
|
|
M15 + N15,
|
|
|
M16 + N16,
|
|
|
M17 + N17
|
|
|
- );
|
|
|
-aggregate_metrics(#{}, Metrics) ->
|
|
|
- Metrics.
|
|
|
+ ).
|
|
|
|
|
|
format_resource(
|
|
|
#{
|
|
|
@@ -826,62 +830,56 @@ format_resource(
|
|
|
).
|
|
|
|
|
|
format_resource_data(ResData) ->
|
|
|
- maps:fold(fun format_resource_data/3, #{}, maps:with([status, metrics, error], ResData)).
|
|
|
+ maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)).
|
|
|
|
|
|
format_resource_data(error, undefined, Result) ->
|
|
|
Result;
|
|
|
format_resource_data(error, Error, Result) ->
|
|
|
Result#{status_reason => emqx_misc:readable_error_msg(Error)};
|
|
|
-format_resource_data(
|
|
|
- metrics,
|
|
|
- #{
|
|
|
- counters := #{
|
|
|
- 'dropped' := Dropped,
|
|
|
- 'dropped.other' := DroppedOther,
|
|
|
- 'dropped.expired' := DroppedExpired,
|
|
|
- 'dropped.queue_full' := DroppedQueueFull,
|
|
|
- 'dropped.resource_not_found' := DroppedResourceNotFound,
|
|
|
- 'dropped.resource_stopped' := DroppedResourceStopped,
|
|
|
- 'matched' := Matched,
|
|
|
- 'retried' := Retried,
|
|
|
- 'late_reply' := LateReply,
|
|
|
- 'failed' := SentFailed,
|
|
|
- 'success' := SentSucc,
|
|
|
- 'received' := Rcvd
|
|
|
- },
|
|
|
- gauges := Gauges,
|
|
|
- rate := #{
|
|
|
- matched := #{current := Rate, last5m := Rate5m, max := RateMax}
|
|
|
- }
|
|
|
+format_resource_data(K, V, Result) ->
|
|
|
+ Result#{K => V}.
|
|
|
+
|
|
|
+format_metrics(#{
|
|
|
+ counters := #{
|
|
|
+ 'dropped' := Dropped,
|
|
|
+ 'dropped.other' := DroppedOther,
|
|
|
+ 'dropped.expired' := DroppedExpired,
|
|
|
+ 'dropped.queue_full' := DroppedQueueFull,
|
|
|
+ 'dropped.resource_not_found' := DroppedResourceNotFound,
|
|
|
+ 'dropped.resource_stopped' := DroppedResourceStopped,
|
|
|
+ 'matched' := Matched,
|
|
|
+ 'retried' := Retried,
|
|
|
+ 'late_reply' := LateReply,
|
|
|
+ 'failed' := SentFailed,
|
|
|
+ 'success' := SentSucc,
|
|
|
+ 'received' := Rcvd
|
|
|
},
|
|
|
- Result
|
|
|
-) ->
|
|
|
+ gauges := Gauges,
|
|
|
+ rate := #{
|
|
|
+ matched := #{current := Rate, last5m := Rate5m, max := RateMax}
|
|
|
+ }
|
|
|
+}) ->
|
|
|
Queued = maps:get('queuing', Gauges, 0),
|
|
|
SentInflight = maps:get('inflight', Gauges, 0),
|
|
|
- Result#{
|
|
|
- metrics =>
|
|
|
- ?METRICS(
|
|
|
- Dropped,
|
|
|
- DroppedOther,
|
|
|
- DroppedExpired,
|
|
|
- DroppedQueueFull,
|
|
|
- DroppedResourceNotFound,
|
|
|
- DroppedResourceStopped,
|
|
|
- Matched,
|
|
|
- Queued,
|
|
|
- Retried,
|
|
|
- LateReply,
|
|
|
- SentFailed,
|
|
|
- SentInflight,
|
|
|
- SentSucc,
|
|
|
- Rate,
|
|
|
- Rate5m,
|
|
|
- RateMax,
|
|
|
- Rcvd
|
|
|
- )
|
|
|
- };
|
|
|
-format_resource_data(K, V, Result) ->
|
|
|
- Result#{K => V}.
|
|
|
+ ?METRICS(
|
|
|
+ Dropped,
|
|
|
+ DroppedOther,
|
|
|
+ DroppedExpired,
|
|
|
+ DroppedQueueFull,
|
|
|
+ DroppedResourceNotFound,
|
|
|
+ DroppedResourceStopped,
|
|
|
+ Matched,
|
|
|
+ Queued,
|
|
|
+ Retried,
|
|
|
+ LateReply,
|
|
|
+ SentFailed,
|
|
|
+ SentInflight,
|
|
|
+ SentSucc,
|
|
|
+ Rate,
|
|
|
+ Rate5m,
|
|
|
+ RateMax,
|
|
|
+ Rcvd
|
|
|
+ ).
|
|
|
|
|
|
fill_defaults(Type, RawConf) ->
|
|
|
PackedConf = pack_bridge_conf(Type, RawConf),
|
|
|
@@ -990,7 +988,7 @@ do_bpapi_call(Node, Call, Args) ->
|
|
|
do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
|
|
|
case lists:member(SupportedVersion, supported_versions(Call)) of
|
|
|
true ->
|
|
|
- apply(emqx_bridge_proto_v3, Call, Args);
|
|
|
+ apply(emqx_bridge_proto_v4, Call, Args);
|
|
|
false ->
|
|
|
{error, not_implemented}
|
|
|
end.
|
|
|
@@ -1000,9 +998,10 @@ maybe_unwrap({error, not_implemented}) ->
|
|
|
maybe_unwrap(RpcMulticallResult) ->
|
|
|
emqx_rpc:unwrap_erpc(RpcMulticallResult).
|
|
|
|
|
|
-supported_versions(start_bridge_to_node) -> [2, 3];
|
|
|
-supported_versions(start_bridges_to_all_nodes) -> [2, 3];
|
|
|
-supported_versions(_Call) -> [1, 2, 3].
|
|
|
+supported_versions(start_bridge_to_node) -> [2, 3, 4];
|
|
|
+supported_versions(start_bridges_to_all_nodes) -> [2, 3, 4];
|
|
|
+supported_versions(get_metrics_from_all_nodes) -> [4];
|
|
|
+supported_versions(_Call) -> [1, 2, 3, 4].
|
|
|
|
|
|
redact(Term) ->
|
|
|
emqx_misc:redact(Term).
|