emqx_prometheus_data_integration.erl 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_prometheus_data_integration).
  17. -export([
  18. deregister_cleanup/1,
  19. collect_mf/2,
  20. collect_metrics/2
  21. ]).
  22. -export([collect/1]).
  23. -export([
  24. zip_json_data_integration_metrics/3
  25. ]).
  26. %% for bpapi
  27. -behaviour(emqx_prometheus_cluster).
  28. -export([
  29. fetch_from_local_node/1,
  30. fetch_cluster_consistented_data/0,
  31. aggre_or_zip_init_acc/0,
  32. logic_sum_metrics/0
  33. ]).
  34. -export([add_collect_family/4]).
  35. -include("emqx_prometheus.hrl").
  36. -include_lib("prometheus/include/prometheus.hrl").
  37. -import(
  38. prometheus_model_helpers,
  39. [
  40. create_mf/5,
  41. gauge_metric/1,
  42. gauge_metrics/1,
  43. counter_metrics/1
  44. ]
  45. ).
  46. %% Please don't remove this attribute, prometheus uses it to
  47. %% automatically register collectors.
  48. -behaviour(prometheus_collector).
  49. %%--------------------------------------------------------------------
  50. %% Macros
  51. %%--------------------------------------------------------------------
  52. -define(METRIC_NAME_PREFIX, "emqx_data_integration_").
  53. -define(MG(K, MAP), maps:get(K, MAP)).
  54. -define(MG0(K, MAP), maps:get(K, MAP, 0)).
  55. %%--------------------------------------------------------------------
  56. %% Callback for emqx_prometheus_cluster
  57. %%--------------------------------------------------------------------
  58. -define(ROOT_KEY_ACTIONS, actions).
  59. fetch_from_local_node(Mode) ->
  60. Rules = emqx_rule_engine:get_rules(),
  61. BridgesV1 = emqx:get_config([bridges], #{}),
  62. BridgeV2Actions = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS),
  63. Connectors = emqx_connector:list(),
  64. {node(self()), #{
  65. rule_metric_data => rule_metric_data(Mode, Rules),
  66. action_metric_data => action_metric_data(Mode, BridgeV2Actions),
  67. connector_metric_data => connector_metric_data(Mode, BridgesV1, Connectors)
  68. }}.
  69. fetch_cluster_consistented_data() ->
  70. Rules = emqx_rule_engine:get_rules(),
  71. %% for bridge v1
  72. BridgesV1 = emqx:get_config([bridges], #{}),
  73. Connectors = emqx_connector:list(),
  74. (maybe_collect_schema_registry())#{
  75. rules_ov_data => rules_ov_data(Rules),
  76. actions_ov_data => actions_ov_data(Rules),
  77. connectors_ov_data => connectors_ov_data(BridgesV1, Connectors)
  78. }.
  79. aggre_or_zip_init_acc() ->
  80. #{
  81. rule_metric_data => maps:from_keys(rule_metric(names), []),
  82. action_metric_data => maps:from_keys(action_metric(names), []),
  83. connector_metric_data => maps:from_keys(connectr_metric(names), [])
  84. }.
  85. logic_sum_metrics() ->
  86. [
  87. emqx_rule_enable,
  88. emqx_connector_enable,
  89. emqx_connector_status
  90. ].
  91. %%--------------------------------------------------------------------
  92. %% Collector API
  93. %%--------------------------------------------------------------------
  94. %% @private
  95. deregister_cleanup(_) -> ok.
  96. %% @private
  97. -spec collect_mf(_Registry, Callback) -> ok when
  98. _Registry :: prometheus_registry:registry(),
  99. Callback :: prometheus_collector:collect_mf_callback().
  100. collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
  101. RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
  102. %% Data Integration Overview
  103. ok = add_collect_family(
  104. Callback,
  105. rules_ov_metric_meta(),
  106. ?MG(rules_ov_data, RawData)
  107. ),
  108. ok = add_collect_family(
  109. Callback,
  110. actions_ov_metric_meta(),
  111. ?MG(actions_ov_data, RawData)
  112. ),
  113. ok = add_collect_family(
  114. Callback,
  115. connectors_ov_metric_meta(),
  116. ?MG(connectors_ov_data, RawData)
  117. ),
  118. ok = maybe_collect_family_schema_registry(Callback),
  119. %% Rule Metric
  120. RuleMetricDs = ?MG(rule_metric_data, RawData),
  121. ok = add_collect_family(Callback, rule_metric_meta(), RuleMetricDs),
  122. %% Action Metric
  123. ActionMetricDs = ?MG(action_metric_data, RawData),
  124. ok = add_collect_family(Callback, action_metric_meta(), ActionMetricDs),
  125. %% Connector Metric
  126. ConnectorMetricDs = ?MG(connector_metric_data, RawData),
  127. ok = add_collect_family(Callback, connector_metric_meta(), ConnectorMetricDs),
  128. ok;
  129. collect_mf(_, _) ->
  130. ok.
  131. %% @private
  132. collect(<<"json">>) ->
  133. RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
  134. Rules = emqx_rule_engine:get_rules(),
  135. Connectors = emqx_connector:list(),
  136. %% for bridge v1
  137. BridgesV1 = emqx:get_config([bridges], #{}),
  138. #{
  139. data_integration_overview => collect_data_integration_overview(
  140. Rules, BridgesV1, Connectors
  141. ),
  142. rules => collect_json_data(?MG(rule_metric_data, RawData)),
  143. actions => collect_json_data(?MG(action_metric_data, RawData)),
  144. connectors => collect_json_data(?MG(connector_metric_data, RawData))
  145. };
  146. collect(<<"prometheus">>) ->
  147. prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY).
  148. %%====================
  149. %% API Helpers
  150. add_collect_family(Callback, MetricWithType, Data) ->
  151. _ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type} <- MetricWithType],
  152. ok.
  153. add_collect_family(Name, Data, Callback, Type) ->
  154. %% TODO: help document from Name
  155. Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)).
  156. collect_metrics(Name, Metrics) ->
  157. collect_di(Name, Metrics).
  158. %%--------------------------------------------------------------------
  159. %% Collector
  160. %%--------------------------------------------------------------------
  161. %%========================================
  162. %% Data Integration Overview
  163. %%========================================
  164. %%====================
  165. %% All Rules
  166. %% Rules
  167. collect_di(K = emqx_rules_count, Data) -> gauge_metric(?MG(K, Data));
  168. %%====================
  169. %% Actions
  170. collect_di(K = emqx_actions_count, Data) -> gauge_metric(?MG(K, Data));
  171. %%====================
  172. %% Schema Registry
  173. collect_di(K = emqx_schema_registrys_count, Data) -> gauge_metric(?MG(K, Data));
  174. %%====================
  175. %% Connectors
  176. collect_di(K = emqx_connectors_count, Data) -> gauge_metric(?MG(K, Data));
  177. %%========================================
  178. %% Data Integration Metric for: Rule && Action && Connector
  179. %%========================================
  180. %%====================
  181. %% Rule Metric
  182. collect_di(K = emqx_rule_enable, Data) -> gauge_metrics(?MG(K, Data));
  183. collect_di(K = emqx_rule_matched, Data) -> counter_metrics(?MG(K, Data));
  184. collect_di(K = emqx_rule_failed, Data) -> counter_metrics(?MG(K, Data));
  185. collect_di(K = emqx_rule_passed, Data) -> counter_metrics(?MG(K, Data));
  186. collect_di(K = emqx_rule_failed_exception, Data) -> counter_metrics(?MG(K, Data));
  187. collect_di(K = emqx_rule_failed_no_result, Data) -> counter_metrics(?MG(K, Data));
  188. collect_di(K = emqx_rule_actions_total, Data) -> counter_metrics(?MG(K, Data));
  189. collect_di(K = emqx_rule_actions_success, Data) -> counter_metrics(?MG(K, Data));
  190. collect_di(K = emqx_rule_actions_failed, Data) -> counter_metrics(?MG(K, Data));
  191. collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> counter_metrics(?MG(K, Data));
  192. collect_di(K = emqx_rule_actions_failed_unknown, Data) -> counter_metrics(?MG(K, Data));
  193. collect_di(K = emqx_rule_actions_discarded, Data) -> counter_metrics(?MG(K, Data));
  194. %%====================
  195. %% Action Metric
  196. collect_di(K = emqx_action_enable, Data) -> gauge_metrics(?MG(K, Data));
  197. collect_di(K = emqx_action_status, Data) -> gauge_metrics(?MG(K, Data));
  198. collect_di(K = emqx_action_matched, Data) -> counter_metrics(?MG(K, Data));
  199. collect_di(K = emqx_action_dropped, Data) -> counter_metrics(?MG(K, Data));
  200. collect_di(K = emqx_action_success, Data) -> counter_metrics(?MG(K, Data));
  201. collect_di(K = emqx_action_failed, Data) -> counter_metrics(?MG(K, Data));
  202. %% inflight type: gauge
  203. collect_di(K = emqx_action_inflight, Data) -> gauge_metrics(?MG(K, Data));
  204. collect_di(K = emqx_action_received, Data) -> counter_metrics(?MG(K, Data));
  205. collect_di(K = emqx_action_late_reply, Data) -> counter_metrics(?MG(K, Data));
  206. collect_di(K = emqx_action_retried, Data) -> counter_metrics(?MG(K, Data));
  207. collect_di(K = emqx_action_retried_success, Data) -> counter_metrics(?MG(K, Data));
  208. collect_di(K = emqx_action_retried_failed, Data) -> counter_metrics(?MG(K, Data));
  209. collect_di(K = emqx_action_dropped_resource_stopped, Data) -> counter_metrics(?MG(K, Data));
  210. collect_di(K = emqx_action_dropped_resource_not_found, Data) -> counter_metrics(?MG(K, Data));
  211. collect_di(K = emqx_action_dropped_queue_full, Data) -> counter_metrics(?MG(K, Data));
  212. collect_di(K = emqx_action_dropped_other, Data) -> counter_metrics(?MG(K, Data));
  213. collect_di(K = emqx_action_dropped_expired, Data) -> counter_metrics(?MG(K, Data));
  214. %% queuing type: gauge
  215. collect_di(K = emqx_action_queuing, Data) -> gauge_metrics(?MG(K, Data));
  216. %%====================
  217. %% Connector Metric
  218. collect_di(K = emqx_connector_enable, Data) -> gauge_metrics(?MG(K, Data));
  219. collect_di(K = emqx_connector_status, Data) -> gauge_metrics(?MG(K, Data)).
  220. %%--------------------------------------------------------------------
  221. %% Internal functions
  222. %%--------------------------------------------------------------------
  223. %%========================================
  224. %% Data Integration Overview
  225. %%========================================
  226. %%====================
  227. %% All Rules
  228. rules_ov_metric_meta() ->
  229. [
  230. {emqx_rules_count, gauge}
  231. ].
  232. rules_ov_metric(names) ->
  233. emqx_prometheus_cluster:metric_names(rules_ov_metric_meta()).
  234. -define(RULE_TAB, emqx_rule_engine).
  235. rules_ov_data(_Rules) ->
  236. #{
  237. emqx_rules_count => ets:info(?RULE_TAB, size)
  238. }.
  239. %%====================
  240. %% Actions
  241. actions_ov_metric_meta() ->
  242. [
  243. {emqx_actions_count, gauge}
  244. ].
  245. actions_ov_metric(names) ->
  246. emqx_prometheus_cluster:metric_names(actions_ov_metric_meta()).
  247. actions_ov_data(Rules) ->
  248. ActionsCount = lists:foldl(
  249. fun
  250. (#{actions := Actions} = _Rule, AccIn) ->
  251. AccIn + length(Actions);
  252. (_, AccIn) ->
  253. AccIn
  254. end,
  255. 0,
  256. Rules
  257. ),
  258. #{
  259. emqx_actions_count => ActionsCount
  260. }.
  261. %%====================
  262. %% Schema Registry
  263. -if(?EMQX_RELEASE_EDITION == ee).
  264. maybe_collect_family_schema_registry(Callback) ->
  265. ok = add_collect_family(Callback, schema_registry_metric_meta(), schema_registry_data()),
  266. ok.
  267. schema_registry_metric_meta() ->
  268. [
  269. {emqx_schema_registrys_count, gauge}
  270. ].
  271. schema_registry_data() ->
  272. #{
  273. emqx_schema_registrys_count => erlang:map_size(emqx_schema_registry:list_schemas())
  274. }.
  275. maybe_collect_schema_registry() ->
  276. schema_registry_data().
  277. -else.
  278. maybe_collect_family_schema_registry(_) ->
  279. ok.
  280. maybe_collect_schema_registry() ->
  281. #{}.
  282. -endif.
  283. %%====================
  284. %% Connectors
  285. connectors_ov_metric_meta() ->
  286. [
  287. {emqx_connectors_count, gauge}
  288. ].
  289. connectors_ov_metric(names) ->
  290. emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()).
  291. connectors_ov_data(BridgesV1, Connectors) ->
  292. %% Both Bridge V1 and V2
  293. V1ConnectorsCnt = maps:fold(
  294. fun(_Type, NameAndConf, AccIn) ->
  295. AccIn + maps:size(NameAndConf)
  296. end,
  297. 0,
  298. BridgesV1
  299. ),
  300. #{
  301. emqx_connectors_count => erlang:length(Connectors) + V1ConnectorsCnt
  302. }.
  303. %%========================================
  304. %% Data Integration Metric for: Rule && Action && Connector
  305. %%========================================
  306. %%====================
  307. %% Rule Metric
  308. %% With rule_id as label key: `rule_id`
  309. rule_metric_meta() ->
  310. [
  311. {emqx_rule_enable, gauge},
  312. {emqx_rule_matched, counter},
  313. {emqx_rule_failed, counter},
  314. {emqx_rule_passed, counter},
  315. {emqx_rule_failed_exception, counter},
  316. {emqx_rule_failed_no_result, counter},
  317. {emqx_rule_actions_total, counter},
  318. {emqx_rule_actions_success, counter},
  319. {emqx_rule_actions_failed, counter},
  320. {emqx_rule_actions_failed_out_of_service, counter},
  321. {emqx_rule_actions_failed_unknown, counter},
  322. {emqx_rule_actions_discarded, counter}
  323. ].
  324. rule_metric(names) ->
  325. emqx_prometheus_cluster:metric_names(rule_metric_meta()).
  326. rule_metric_data(Mode, Rules) ->
  327. lists:foldl(
  328. fun(#{id := Id} = Rule, AccIn) ->
  329. merge_acc_with_rules(Mode, Id, get_metric(Rule), AccIn)
  330. end,
  331. maps:from_keys(rule_metric(names), []),
  332. Rules
  333. ).
  334. merge_acc_with_rules(Mode, Id, RuleMetrics, PointsAcc) ->
  335. maps:fold(
  336. fun(K, V, AccIn) ->
  337. AccIn#{K => [rule_point(Mode, Id, V) | ?MG(K, AccIn)]}
  338. end,
  339. PointsAcc,
  340. RuleMetrics
  341. ).
  342. rule_point(Mode, Id, V) ->
  343. {with_node_label(Mode, [{id, Id}]), V}.
  344. get_metric(#{id := Id, enable := Bool} = _Rule) ->
  345. #{counters := Counters} =
  346. emqx_metrics_worker:get_metrics(rule_metrics, Id),
  347. #{
  348. emqx_rule_enable => emqx_prometheus_cluster:boolean_to_number(Bool),
  349. emqx_rule_matched => ?MG(matched, Counters),
  350. emqx_rule_failed => ?MG(failed, Counters),
  351. emqx_rule_passed => ?MG(passed, Counters),
  352. emqx_rule_failed_exception => ?MG('failed.exception', Counters),
  353. emqx_rule_failed_no_result => ?MG('failed.no_result', Counters),
  354. emqx_rule_actions_total => ?MG('actions.total', Counters),
  355. emqx_rule_actions_success => ?MG('actions.success', Counters),
  356. emqx_rule_actions_failed => ?MG('actions.failed', Counters),
  357. emqx_rule_actions_failed_out_of_service => ?MG('actions.failed.out_of_service', Counters),
  358. emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters),
  359. emqx_rule_actions_discarded => ?MG('actions.discarded', Counters)
  360. }.
  361. %%====================
  362. %% Action Metric
  363. %% With action_id: `{type}:{name}` as label key: `action_id`
  364. action_metric_meta() ->
  365. [
  366. {emqx_action_enable, gauge},
  367. {emqx_action_status, gauge},
  368. {emqx_action_matched, counter},
  369. {emqx_action_dropped, counter},
  370. {emqx_action_success, counter},
  371. {emqx_action_failed, counter},
  372. {emqx_action_inflight, gauge},
  373. {emqx_action_received, counter},
  374. {emqx_action_late_reply, counter},
  375. {emqx_action_retried, counter},
  376. {emqx_action_retried_success, counter},
  377. {emqx_action_retried_failed, counter},
  378. {emqx_action_dropped_resource_stopped, counter},
  379. {emqx_action_dropped_resource_not_found, counter},
  380. {emqx_action_dropped_queue_full, counter},
  381. {emqx_action_dropped_other, counter},
  382. {emqx_action_dropped_expired, counter},
  383. {emqx_action_queuing, gauge}
  384. ].
  385. action_metric(names) ->
  386. emqx_prometheus_cluster:metric_names(action_metric_meta()).
  387. action_metric_data(Mode, Bridges) ->
  388. lists:foldl(
  389. fun(#{type := Type, name := Name} = Action, AccIn) ->
  390. Id = emqx_bridge_resource:bridge_id(Type, Name),
  391. Status = get_action_status(Action),
  392. Metrics = get_action_metric(Type, Name),
  393. merge_acc_with_bridges(Mode, Id, maps:merge(Status, Metrics), AccIn)
  394. end,
  395. maps:from_keys(action_metric(names), []),
  396. Bridges
  397. ).
  398. merge_acc_with_bridges(Mode, Id, BridgeMetrics, PointsAcc) ->
  399. maps:fold(
  400. fun(K, V, AccIn) ->
  401. AccIn#{K => [action_point(Mode, Id, V) | ?MG(K, AccIn)]}
  402. end,
  403. PointsAcc,
  404. BridgeMetrics
  405. ).
  406. get_action_status(#{resource_data := ResourceData} = _Action) ->
  407. Enable = emqx_utils_maps:deep_get([config, enable], ResourceData),
  408. Status = ?MG(status, ResourceData),
  409. #{
  410. emqx_action_enable => emqx_prometheus_cluster:boolean_to_number(Enable),
  411. emqx_action_status => emqx_prometheus_cluster:status_to_number(Status)
  412. }.
  413. action_point(Mode, Id, V) ->
  414. {with_node_label(Mode, [{id, Id}]), V}.
  415. get_action_metric(Type, Name) ->
  416. #{counters := Counters, gauges := Gauges} = emqx_bridge_v2:get_metrics(Type, Name),
  417. #{
  418. emqx_action_matched => ?MG0(matched, Counters),
  419. emqx_action_dropped => ?MG0(dropped, Counters),
  420. emqx_action_success => ?MG0(success, Counters),
  421. emqx_action_failed => ?MG0(failed, Counters),
  422. emqx_action_inflight => ?MG0(inflight, Gauges),
  423. emqx_action_received => ?MG0(received, Counters),
  424. emqx_action_late_reply => ?MG0(late_reply, Counters),
  425. emqx_action_retried => ?MG0(retried, Counters),
  426. emqx_action_retried_success => ?MG0('retried.success', Counters),
  427. emqx_action_retried_failed => ?MG0('retried.failed', Counters),
  428. emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters),
  429. emqx_action_dropped_resource_not_found => ?MG0('dropped.resource_not_found', Counters),
  430. emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters),
  431. emqx_action_dropped_other => ?MG0('dropped.other', Counters),
  432. emqx_action_dropped_expired => ?MG0('dropped.expired', Counters),
  433. emqx_action_queuing => ?MG0(queuing, Gauges)
  434. }.
  435. %%====================
  436. %% Connector Metric
  437. %% With connector_id: `{type}:{name}` as label key: `connector_id`
  438. connector_metric_meta() ->
  439. [
  440. {emqx_connector_enable, gauge},
  441. {emqx_connector_status, gauge}
  442. ].
  443. connectr_metric(names) ->
  444. emqx_prometheus_cluster:metric_names(connector_metric_meta()).
  445. connector_metric_data(Mode, BridgesV1, Connectors) ->
  446. AccIn = maps:from_keys(connectr_metric(names), []),
  447. Acc0 = connector_metric_data_v1(Mode, BridgesV1, AccIn),
  448. _AccOut = connector_metric_data_v2(Mode, Connectors, Acc0).
  449. connector_metric_data_v2(Mode, Connectors, InitAcc) ->
  450. lists:foldl(
  451. fun(#{type := Type, name := Name, resource_data := ResourceData} = _Connector, AccIn) ->
  452. Id = emqx_connector_resource:connector_id(Type, Name),
  453. merge_acc_with_connectors(Mode, Id, get_connector_status(ResourceData), AccIn)
  454. end,
  455. InitAcc,
  456. Connectors
  457. ).
  458. connector_metric_data_v1(Mode, BridgesV1, InitAcc) ->
  459. maps:fold(
  460. fun(Type, NameAndConfMap, Acc0) ->
  461. maps:fold(
  462. fun(Name, _Conf, Acc1) ->
  463. BridgeV1Id = emqx_bridge_resource:resource_id(Type, Name),
  464. case emqx_resource:get_instance(BridgeV1Id) of
  465. {error, not_found} ->
  466. Acc1;
  467. {ok, _, ResourceData} ->
  468. merge_acc_with_connectors(
  469. Mode, BridgeV1Id, get_connector_status(ResourceData), Acc1
  470. )
  471. end
  472. end,
  473. Acc0,
  474. NameAndConfMap
  475. )
  476. end,
  477. InitAcc,
  478. BridgesV1
  479. ).
  480. merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
  481. maps:fold(
  482. fun(K, V, AccIn) ->
  483. AccIn#{K => [connector_point(Mode, Id, V) | ?MG(K, AccIn)]}
  484. end,
  485. PointsAcc,
  486. ConnectorMetrics
  487. ).
  488. connector_point(Mode, Id, V) ->
  489. {with_node_label(Mode, [{id, Id}]), V}.
  490. get_connector_status(ResourceData) ->
  491. Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
  492. Status = ?MG(status, ResourceData),
  493. #{
  494. emqx_connector_enable => emqx_prometheus_cluster:boolean_to_number(Enabled),
  495. emqx_connector_status => emqx_prometheus_cluster:status_to_number(Status)
  496. }.
  497. %%--------------------------------------------------------------------
  498. %% Collect functions
  499. %%--------------------------------------------------------------------
  500. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  501. %% merge / zip formatting funcs for type `application/json`
  502. collect_data_integration_overview(Rules, BridgesV1, Connectors) ->
  503. RulesD = rules_ov_data(Rules),
  504. ActionsD = actions_ov_data(Rules),
  505. ConnectorsD = connectors_ov_data(BridgesV1, Connectors),
  506. M1 = lists:foldl(
  507. fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,
  508. #{},
  509. rules_ov_metric(names)
  510. ),
  511. M2 = lists:foldl(
  512. fun(K, AccIn) -> AccIn#{K => ?MG(K, ActionsD)} end,
  513. #{},
  514. actions_ov_metric(names)
  515. ),
  516. M3 = lists:foldl(
  517. fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
  518. #{},
  519. connectors_ov_metric(names)
  520. ),
  521. M4 = maybe_collect_schema_registry(),
  522. lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3, M4]).
  523. collect_json_data(Data) ->
  524. emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_data_integration_metrics/3).
  525. %% for initialized empty AccIn
  526. %% The following fields will be put into Result
  527. %% For Rules:
  528. %% `id` => [RULE_ID]
  529. %% For Actions
  530. %% `id` => [ACTION_ID]
  531. %% FOR Connectors
  532. %% `id` => [CONNECTOR_ID] %% CONNECTOR_ID = BRIDGE_ID
  533. %% formatted with {type}:{name}
  534. zip_json_data_integration_metrics(Key, Points, [] = _AccIn) ->
  535. lists:foldl(
  536. fun({Lables, Metric}, AccIn2) ->
  537. LablesKVMap = maps:from_list(Lables),
  538. Point = LablesKVMap#{Key => Metric},
  539. [Point | AccIn2]
  540. end,
  541. [],
  542. Points
  543. );
  544. zip_json_data_integration_metrics(Key, Points, AllResultedAcc) ->
  545. ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
  546. lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult).
  547. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  548. %% Helper funcs
  549. with_node_label(?PROM_DATA_MODE__NODE, Labels) ->
  550. Labels;
  551. with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) ->
  552. Labels;
  553. with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) ->
  554. [{node, node(self())} | Labels].