|
@@ -89,6 +89,7 @@ fetch_cluster_consistented_data() ->
|
|
|
Connectors = emqx_connector:list(),
|
|
Connectors = emqx_connector:list(),
|
|
|
(maybe_collect_schema_registry())#{
|
|
(maybe_collect_schema_registry())#{
|
|
|
rules_ov_data => rules_ov_data(Rules),
|
|
rules_ov_data => rules_ov_data(Rules),
|
|
|
|
|
+ actions_ov_data => actions_ov_data(Rules),
|
|
|
connectors_ov_data => connectors_ov_data(BridgesV1, Connectors)
|
|
connectors_ov_data => connectors_ov_data(BridgesV1, Connectors)
|
|
|
}.
|
|
}.
|
|
|
|
|
|
|
@@ -121,9 +122,20 @@ collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
|
|
|
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
|
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
|
|
|
|
|
|
|
%% Data Integration Overview
|
|
%% Data Integration Overview
|
|
|
- ok = add_collect_family(Callback, rules_ov_metric_meta(), ?MG(rules_ov_data, RawData)),
|
|
|
|
|
ok = add_collect_family(
|
|
ok = add_collect_family(
|
|
|
- Callback, connectors_ov_metric_meta(), ?MG(connectors_ov_data, RawData)
|
|
|
|
|
|
|
+ Callback,
|
|
|
|
|
+ rules_ov_metric_meta(),
|
|
|
|
|
+ ?MG(rules_ov_data, RawData)
|
|
|
|
|
+ ),
|
|
|
|
|
+ ok = add_collect_family(
|
|
|
|
|
+ Callback,
|
|
|
|
|
+ actions_ov_metric_meta(),
|
|
|
|
|
+ ?MG(actions_ov_data, RawData)
|
|
|
|
|
+ ),
|
|
|
|
|
+ ok = add_collect_family(
|
|
|
|
|
+ Callback,
|
|
|
|
|
+ connectors_ov_metric_meta(),
|
|
|
|
|
+ ?MG(connectors_ov_data, RawData)
|
|
|
),
|
|
),
|
|
|
ok = maybe_collect_family_schema_registry(Callback),
|
|
ok = maybe_collect_family_schema_registry(Callback),
|
|
|
|
|
|
|
@@ -188,6 +200,9 @@ collect_metrics(Name, Metrics) ->
|
|
|
%% Rules
|
|
%% Rules
|
|
|
collect_di(K = emqx_rules_count, Data) -> gauge_metric(?MG(K, Data));
|
|
collect_di(K = emqx_rules_count, Data) -> gauge_metric(?MG(K, Data));
|
|
|
%%====================
|
|
%%====================
|
|
|
|
|
+%% Actions
|
|
|
|
|
+collect_di(K = emqx_actions_count, Data) -> gauge_metric(?MG(K, Data));
|
|
|
|
|
+%%====================
|
|
|
%% Schema Registry
|
|
%% Schema Registry
|
|
|
collect_di(K = emqx_schema_registrys_count, Data) -> gauge_metric(?MG(K, Data));
|
|
collect_di(K = emqx_schema_registrys_count, Data) -> gauge_metric(?MG(K, Data));
|
|
|
%%====================
|
|
%%====================
|
|
@@ -262,6 +277,32 @@ rules_ov_data(_Rules) ->
|
|
|
emqx_rules_count => ets:info(?RULE_TAB, size)
|
|
emqx_rules_count => ets:info(?RULE_TAB, size)
|
|
|
}.
|
|
}.
|
|
|
|
|
|
|
|
|
|
+%%====================
|
|
|
|
|
+%% Actions
|
|
|
|
|
+
|
|
|
|
|
+actions_ov_metric_meta() ->
|
|
|
|
|
+ [
|
|
|
|
|
+ {emqx_actions_count, gauge}
|
|
|
|
|
+ ].
|
|
|
|
|
+
|
|
|
|
|
+actions_ov_metric(names) ->
|
|
|
|
|
+ emqx_prometheus_cluster:metric_names(actions_ov_metric_meta()).
|
|
|
|
|
+
|
|
|
|
|
+actions_ov_data(Rules) ->
|
|
|
|
|
+ ActionsCount = lists:foldl(
|
|
|
|
|
+ fun
|
|
|
|
|
+ (#{actions := Actions} = _Rule, AccIn) ->
|
|
|
|
|
+ AccIn + length(Actions);
|
|
|
|
|
+ (_, AccIn) ->
|
|
|
|
|
+ AccIn
|
|
|
|
|
+ end,
|
|
|
|
|
+ 0,
|
|
|
|
|
+ Rules
|
|
|
|
|
+ ),
|
|
|
|
|
+ #{
|
|
|
|
|
+ emqx_actions_count => ActionsCount
|
|
|
|
|
+ }.
|
|
|
|
|
+
|
|
|
%%====================
|
|
%%====================
|
|
|
%% Schema Registry
|
|
%% Schema Registry
|
|
|
|
|
|
|
@@ -366,24 +407,21 @@ rule_point(Mode, Id, V) ->
|
|
|
{with_node_label(Mode, [{id, Id}]), V}.
|
|
{with_node_label(Mode, [{id, Id}]), V}.
|
|
|
|
|
|
|
|
get_metric(#{id := Id, enable := Bool} = _Rule) ->
|
|
get_metric(#{id := Id, enable := Bool} = _Rule) ->
|
|
|
- case emqx_metrics_worker:get_metrics(rule_metrics, Id) of
|
|
|
|
|
- #{counters := Counters} ->
|
|
|
|
|
- #{
|
|
|
|
|
- emqx_rule_enable => emqx_prometheus_cluster:boolean_to_number(Bool),
|
|
|
|
|
- emqx_rule_matched => ?MG(matched, Counters),
|
|
|
|
|
- emqx_rule_failed => ?MG(failed, Counters),
|
|
|
|
|
- emqx_rule_passed => ?MG(passed, Counters),
|
|
|
|
|
- emqx_rule_failed_exception => ?MG('failed.exception', Counters),
|
|
|
|
|
- emqx_rule_failed_no_result => ?MG('failed.no_result', Counters),
|
|
|
|
|
- emqx_rule_actions_total => ?MG('actions.total', Counters),
|
|
|
|
|
- emqx_rule_actions_success => ?MG('actions.success', Counters),
|
|
|
|
|
- emqx_rule_actions_failed => ?MG('actions.failed', Counters),
|
|
|
|
|
- emqx_rule_actions_failed_out_of_service => ?MG(
|
|
|
|
|
- 'actions.failed.out_of_service', Counters
|
|
|
|
|
- ),
|
|
|
|
|
- emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters)
|
|
|
|
|
- }
|
|
|
|
|
- end.
|
|
|
|
|
|
|
+ #{counters := Counters} =
|
|
|
|
|
+ emqx_metrics_worker:get_metrics(rule_metrics, Id),
|
|
|
|
|
+ #{
|
|
|
|
|
+ emqx_rule_enable => emqx_prometheus_cluster:boolean_to_number(Bool),
|
|
|
|
|
+ emqx_rule_matched => ?MG(matched, Counters),
|
|
|
|
|
+ emqx_rule_failed => ?MG(failed, Counters),
|
|
|
|
|
+ emqx_rule_passed => ?MG(passed, Counters),
|
|
|
|
|
+ emqx_rule_failed_exception => ?MG('failed.exception', Counters),
|
|
|
|
|
+ emqx_rule_failed_no_result => ?MG('failed.no_result', Counters),
|
|
|
|
|
+ emqx_rule_actions_total => ?MG('actions.total', Counters),
|
|
|
|
|
+ emqx_rule_actions_success => ?MG('actions.success', Counters),
|
|
|
|
|
+ emqx_rule_actions_failed => ?MG('actions.failed', Counters),
|
|
|
|
|
+ emqx_rule_actions_failed_out_of_service => ?MG('actions.failed.out_of_service', Counters),
|
|
|
|
|
+ emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters)
|
|
|
|
|
+ }.
|
|
|
|
|
|
|
|
%%====================
|
|
%%====================
|
|
|
%% Action Metric
|
|
%% Action Metric
|
|
@@ -546,6 +584,7 @@ get_connector_status(ResourceData) ->
|
|
|
%% merge / zip formatting funcs for type `application/json`
|
|
%% merge / zip formatting funcs for type `application/json`
|
|
|
collect_data_integration_overview(Rules, BridgesV1, Connectors) ->
|
|
collect_data_integration_overview(Rules, BridgesV1, Connectors) ->
|
|
|
RulesD = rules_ov_data(Rules),
|
|
RulesD = rules_ov_data(Rules),
|
|
|
|
|
+ ActionsD = actions_ov_data(Rules),
|
|
|
ConnectorsD = connectors_ov_data(BridgesV1, Connectors),
|
|
ConnectorsD = connectors_ov_data(BridgesV1, Connectors),
|
|
|
|
|
|
|
|
M1 = lists:foldl(
|
|
M1 = lists:foldl(
|
|
@@ -554,13 +593,18 @@ collect_data_integration_overview(Rules, BridgesV1, Connectors) ->
|
|
|
rules_ov_metric(names)
|
|
rules_ov_metric(names)
|
|
|
),
|
|
),
|
|
|
M2 = lists:foldl(
|
|
M2 = lists:foldl(
|
|
|
|
|
+ fun(K, AccIn) -> AccIn#{K => ?MG(K, ActionsD)} end,
|
|
|
|
|
+ #{},
|
|
|
|
|
+ actions_ov_metric(names)
|
|
|
|
|
+ ),
|
|
|
|
|
+ M3 = lists:foldl(
|
|
|
fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
|
|
fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
|
|
|
#{},
|
|
#{},
|
|
|
connectors_ov_metric(names)
|
|
connectors_ov_metric(names)
|
|
|
),
|
|
),
|
|
|
- M3 = maybe_collect_schema_registry(),
|
|
|
|
|
|
|
+ M4 = maybe_collect_schema_registry(),
|
|
|
|
|
|
|
|
- lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]).
|
|
|
|
|
|
|
+ lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3, M4]).
|
|
|
|
|
|
|
|
collect_json_data(Data) ->
|
|
collect_json_data(Data) ->
|
|
|
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_data_integration_metrics/3).
|
|
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_data_integration_metrics/3).
|