|
|
@@ -73,20 +73,23 @@
|
|
|
|
|
|
fetch_from_local_node(Mode) ->
|
|
|
Rules = emqx_rule_engine:get_rules(),
|
|
|
- Bridges = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS),
|
|
|
+ BridgesV1 = emqx:get_config([bridges], #{}),
|
|
|
+ BridgeV2Actions = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS),
|
|
|
Connectors = emqx_connector:list(),
|
|
|
{node(self()), #{
|
|
|
rule_metric_data => rule_metric_data(Mode, Rules),
|
|
|
- action_metric_data => action_metric_data(Mode, Bridges),
|
|
|
- connector_metric_data => connector_metric_data(Mode, Connectors)
|
|
|
+ action_metric_data => action_metric_data(Mode, BridgeV2Actions),
|
|
|
+ connector_metric_data => connector_metric_data(Mode, BridgesV1, Connectors)
|
|
|
}}.
|
|
|
|
|
|
fetch_cluster_consistented_data() ->
|
|
|
Rules = emqx_rule_engine:get_rules(),
|
|
|
+ %% for bridge v1
|
|
|
+ BridgesV1 = emqx:get_config([bridges], #{}),
|
|
|
Connectors = emqx_connector:list(),
|
|
|
(maybe_collect_schema_registry())#{
|
|
|
rules_ov_data => rules_ov_data(Rules),
|
|
|
- connectors_ov_data => connectors_ov_data(Connectors)
|
|
|
+ connectors_ov_data => connectors_ov_data(BridgesV1, Connectors)
|
|
|
}.
|
|
|
|
|
|
aggre_or_zip_init_acc() ->
|
|
|
@@ -144,9 +147,13 @@ collect_mf(_, _) ->
|
|
|
collect(<<"json">>) ->
|
|
|
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
|
|
Rules = emqx_rule_engine:get_rules(),
|
|
|
- Bridges = emqx_bridge:list(),
|
|
|
+ Connectors = emqx_connector:list(),
|
|
|
+ %% for bridge v1
|
|
|
+ BridgesV1 = emqx:get_config([bridges], #{}),
|
|
|
#{
|
|
|
- data_integration_overview => collect_data_integration_overview(Rules, Bridges),
|
|
|
+ data_integration_overview => collect_data_integration_overview(
|
|
|
+ Rules, BridgesV1, Connectors
|
|
|
+ ),
|
|
|
rules => collect_json_data(?MG(rule_metric_data, RawData)),
|
|
|
actions => collect_json_data(?MG(action_metric_data, RawData)),
|
|
|
connectors => collect_json_data(?MG(connector_metric_data, RawData))
|
|
|
@@ -205,6 +212,8 @@ collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> counter_metrics
|
|
|
collect_di(K = emqx_rule_actions_failed_unknown, Data) -> counter_metrics(?MG(K, Data));
|
|
|
%%====================
|
|
|
%% Action Metric
|
|
|
+collect_di(K = emqx_action_enable, Data) -> gauge_metrics(?MG(K, Data));
|
|
|
+collect_di(K = emqx_action_status, Data) -> gauge_metrics(?MG(K, Data));
|
|
|
collect_di(K = emqx_action_matched, Data) -> counter_metrics(?MG(K, Data));
|
|
|
collect_di(K = emqx_action_dropped, Data) -> counter_metrics(?MG(K, Data));
|
|
|
collect_di(K = emqx_action_success, Data) -> counter_metrics(?MG(K, Data));
|
|
|
@@ -296,10 +305,17 @@ connectors_ov_metric_meta() ->
|
|
|
connectors_ov_metric(names) ->
|
|
|
emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()).
|
|
|
|
|
|
-connectors_ov_data(Connectors) ->
|
|
|
+connectors_ov_data(BridgesV1, Connectors) ->
|
|
|
+ %% Both Bridge V1 and V2
|
|
|
+ V1ConnectorsCnt = maps:fold(
|
|
|
+ fun(_Type, NameAndConf, AccIn) ->
|
|
|
+ AccIn + maps:size(NameAndConf)
|
|
|
+ end,
|
|
|
+ 0,
|
|
|
+ BridgesV1
|
|
|
+ ),
|
|
|
#{
|
|
|
- %% Both Bridge V1 and V2
|
|
|
- emqx_connectors_count => erlang:length(Connectors)
|
|
|
+ emqx_connectors_count => erlang:length(Connectors) + V1ConnectorsCnt
|
|
|
}.
|
|
|
|
|
|
%%========================================
|
|
|
@@ -375,6 +391,8 @@ get_metric(#{id := Id, enable := Bool} = _Rule) ->
|
|
|
|
|
|
action_metric_meta() ->
|
|
|
[
|
|
|
+ {emqx_action_enable, gauge},
|
|
|
+ {emqx_action_status, gauge},
|
|
|
{emqx_action_matched, counter},
|
|
|
{emqx_action_dropped, counter},
|
|
|
{emqx_action_success, counter},
|
|
|
@@ -398,9 +416,11 @@ action_metric(names) ->
|
|
|
|
|
|
action_metric_data(Mode, Bridges) ->
|
|
|
lists:foldl(
|
|
|
- fun(#{type := Type, name := Name} = _Bridge, AccIn) ->
|
|
|
+ fun(#{type := Type, name := Name} = Action, AccIn) ->
|
|
|
Id = emqx_bridge_resource:bridge_id(Type, Name),
|
|
|
- merge_acc_with_bridges(Mode, Id, get_bridge_metric(Type, Name), AccIn)
|
|
|
+ Status = get_action_status(Action),
|
|
|
+ Metrics = get_action_metric(Type, Name),
|
|
|
+ merge_acc_with_bridges(Mode, Id, maps:merge(Status, Metrics), AccIn)
|
|
|
end,
|
|
|
maps:from_keys(action_metric(names), []),
|
|
|
Bridges
|
|
|
@@ -415,10 +435,18 @@ merge_acc_with_bridges(Mode, Id, BridgeMetrics, PointsAcc) ->
|
|
|
BridgeMetrics
|
|
|
).
|
|
|
|
|
|
+get_action_status(#{resource_data := ResourceData} = _Action) ->
|
|
|
+ Enable = emqx_utils_maps:deep_get([config, enable], ResourceData),
|
|
|
+ Status = ?MG(status, ResourceData),
|
|
|
+ #{
|
|
|
+ emqx_action_enable => emqx_prometheus_cluster:boolean_to_number(Enable),
|
|
|
+ emqx_action_status => emqx_prometheus_cluster:status_to_number(Status)
|
|
|
+ }.
|
|
|
+
|
|
|
action_point(Mode, Id, V) ->
|
|
|
{with_node_label(Mode, [{id, Id}]), V}.
|
|
|
|
|
|
-get_bridge_metric(Type, Name) ->
|
|
|
+get_action_metric(Type, Name) ->
|
|
|
#{counters := Counters, gauges := Gauges} = emqx_bridge_v2:get_metrics(Type, Name),
|
|
|
#{
|
|
|
emqx_action_matched => ?MG0(matched, Counters),
|
|
|
@@ -452,16 +480,44 @@ connector_metric_meta() ->
|
|
|
connectr_metric(names) ->
|
|
|
emqx_prometheus_cluster:metric_names(connector_metric_meta()).
|
|
|
|
|
|
-connector_metric_data(Mode, Connectors) ->
|
|
|
+connector_metric_data(Mode, BridgesV1, Connectors) ->
|
|
|
+ AccIn = maps:from_keys(connectr_metric(names), []),
|
|
|
+ Acc0 = connector_metric_data_v1(Mode, BridgesV1, AccIn),
|
|
|
+ _AccOut = connector_metric_data_v2(Mode, Connectors, Acc0).
|
|
|
+
|
|
|
+connector_metric_data_v2(Mode, Connectors, InitAcc) ->
|
|
|
lists:foldl(
|
|
|
- fun(#{type := Type, name := Name} = Connector, AccIn) ->
|
|
|
+ fun(#{type := Type, name := Name, resource_data := ResourceData} = _Connector, AccIn) ->
|
|
|
Id = emqx_connector_resource:connector_id(Type, Name),
|
|
|
- merge_acc_with_connectors(Mode, Id, get_connector_status(Connector), AccIn)
|
|
|
+ merge_acc_with_connectors(Mode, Id, get_connector_status(ResourceData), AccIn)
|
|
|
end,
|
|
|
- maps:from_keys(connectr_metric(names), []),
|
|
|
+ InitAcc,
|
|
|
Connectors
|
|
|
).
|
|
|
|
|
|
+connector_metric_data_v1(Mode, BridgesV1, InitAcc) ->
|
|
|
+ maps:fold(
|
|
|
+ fun(Type, NameAndConfMap, Acc0) ->
|
|
|
+ maps:fold(
|
|
|
+ fun(Name, _Conf, Acc1) ->
|
|
|
+ BridgeV1Id = emqx_bridge_resource:resource_id(Type, Name),
|
|
|
+ case emqx_resource:get_instance(BridgeV1Id) of
|
|
|
+ {error, not_found} ->
|
|
|
+ Acc1;
|
|
|
+ {ok, _, ResourceData} ->
|
|
|
+ merge_acc_with_connectors(
|
|
|
+ Mode, BridgeV1Id, get_connector_status(ResourceData), Acc1
|
|
|
+ )
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ Acc0,
|
|
|
+ NameAndConfMap
|
|
|
+ )
|
|
|
+ end,
|
|
|
+ InitAcc,
|
|
|
+ BridgesV1
|
|
|
+ ).
|
|
|
+
|
|
|
merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
|
|
|
maps:fold(
|
|
|
fun(K, V, AccIn) ->
|
|
|
@@ -474,7 +530,7 @@ merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
|
|
|
connector_point(Mode, Id, V) ->
|
|
|
{with_node_label(Mode, [{id, Id}]), V}.
|
|
|
|
|
|
-get_connector_status(#{resource_data := ResourceData} = _Connector) ->
|
|
|
+get_connector_status(ResourceData) ->
|
|
|
Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
|
|
|
Status = ?MG(status, ResourceData),
|
|
|
#{
|
|
|
@@ -488,9 +544,9 @@ get_connector_status(#{resource_data := ResourceData} = _Connector) ->
|
|
|
|
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
|
%% merge / zip formatting funcs for type `application/json`
|
|
|
-collect_data_integration_overview(Rules, Bridges) ->
|
|
|
+collect_data_integration_overview(Rules, BridgesV1, Connectors) ->
|
|
|
RulesD = rules_ov_data(Rules),
|
|
|
- ConnectorsD = connectors_ov_data(Bridges),
|
|
|
+ ConnectorsD = connectors_ov_data(BridgesV1, Connectors),
|
|
|
|
|
|
M1 = lists:foldl(
|
|
|
fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,
|