emqx_prometheus_data_SUITE.erl 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_prometheus_data_SUITE).
  17. -compile(nowarn_export_all).
  18. -compile(export_all).
  19. -include("emqx_prometheus.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -include_lib("common_test/include/ct.hrl").
  22. -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
  23. %% erlfmt-ignore
  24. -define(EMQX_CONF, <<"
  25. authentication = [
  26. {
  27. backend = built_in_database
  28. enable = true
  29. mechanism = password_based
  30. password_hash_algorithm {name = sha256, salt_position = suffix}
  31. user_id_type = username
  32. },
  33. {
  34. algorithm = sha256
  35. backend = built_in_database
  36. enable = true
  37. iteration_count = 4096
  38. mechanism = scram
  39. }
  40. ]
  41. authorization {
  42. cache {
  43. enable = true
  44. }
  45. deny_action = ignore
  46. no_match = allow
  47. sources = [
  48. {path = \"${EMQX_ETC_DIR}/acl.conf\", type = file}
  49. ]
  50. }
  51. connectors {
  52. http {
  53. test_http_connector {
  54. ssl {enable = false, verify = verify_peer}
  55. url = \"http://127.0.0.1:3000\"
  56. }
  57. }
  58. }
  59. rule_engine {
  60. ignore_sys_message = true
  61. jq_function_default_timeout = 10s
  62. rules {
  63. rule_xbmw {
  64. actions = [\"mqtt:action1\"]
  65. description = \"\"
  66. enable = true
  67. metadata {created_at = 1707244896918}
  68. sql = \"SELECT * FROM \\\"t/#\\\"\"
  69. }
  70. }
  71. }
  72. ">>).
  73. all() ->
  74. lists:flatten([
  75. {group, '/prometheus/stats'},
  76. {group, '/prometheus/auth'},
  77. {group, '/prometheus/data_integration'},
  78. [{group, '/prometheus/schema_validation'} || emqx_release:edition() == ee]
  79. ]).
  80. groups() ->
  81. TCs = emqx_common_test_helpers:all(?MODULE),
  82. AcceptGroups = [
  83. {group, 'text/plain'},
  84. {group, 'application/json'}
  85. ],
  86. ModeGroups = [
  87. {group, ?PROM_DATA_MODE__NODE},
  88. {group, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED},
  89. {group, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED}
  90. ],
  91. [
  92. {'/prometheus/stats', ModeGroups},
  93. {'/prometheus/auth', ModeGroups},
  94. {'/prometheus/data_integration', ModeGroups},
  95. {'/prometheus/schema_validation', ModeGroups},
  96. {?PROM_DATA_MODE__NODE, AcceptGroups},
  97. {?PROM_DATA_MODE__ALL_NODES_AGGREGATED, AcceptGroups},
  98. {?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, AcceptGroups},
  99. {'text/plain', TCs},
  100. {'application/json', TCs}
  101. ].
  102. init_per_suite(Config) ->
  103. emqx_common_test_helpers:clear_screen(),
  104. meck:new(emqx_retainer, [non_strict, passthrough, no_history, no_link]),
  105. meck:expect(emqx_retainer, retained_count, fun() -> 0 end),
  106. meck:expect(
  107. emqx_authz_file,
  108. acl_conf_file,
  109. fun() ->
  110. emqx_common_test_helpers:deps_path(emqx_auth, "etc/acl.conf")
  111. end
  112. ),
  113. ok = emqx_prometheus_SUITE:maybe_meck_license(),
  114. emqx_prometheus_SUITE:start_mock_pushgateway(9091),
  115. application:load(emqx_auth),
  116. Apps = emqx_cth_suite:start(
  117. lists:flatten([
  118. emqx,
  119. {emqx_conf, ?EMQX_CONF},
  120. emqx_auth,
  121. emqx_auth_mnesia,
  122. emqx_rule_engine,
  123. emqx_bridge_http,
  124. emqx_connector,
  125. [
  126. {emqx_schema_validation, #{config => schema_validation_config()}}
  127. || emqx_release:edition() == ee
  128. ],
  129. {emqx_prometheus, emqx_prometheus_SUITE:legacy_conf_default()}
  130. ]),
  131. #{
  132. work_dir => filename:join(?config(priv_dir, Config), ?MODULE)
  133. }
  134. ),
  135. [{apps, Apps} | Config].
  136. end_per_suite(Config) ->
  137. meck:unload([emqx_retainer]),
  138. emqx_prometheus_SUITE:maybe_unmeck_license(),
  139. emqx_prometheus_SUITE:stop_mock_pushgateway(),
  140. {ok, _} = emqx:update_config(
  141. [authorization],
  142. #{
  143. <<"no_match">> => <<"allow">>,
  144. <<"cache">> => #{<<"enable">> => <<"true">>},
  145. <<"sources">> => []
  146. }
  147. ),
  148. emqx_cth_suite:stop(?config(apps, Config)),
  149. ok.
  150. init_per_group('/prometheus/stats', Config) ->
  151. [{module, emqx_prometheus} | Config];
  152. init_per_group('/prometheus/auth', Config) ->
  153. [{module, emqx_prometheus_auth} | Config];
  154. init_per_group('/prometheus/data_integration', Config) ->
  155. [{module, emqx_prometheus_data_integration} | Config];
  156. init_per_group('/prometheus/schema_validation', Config) ->
  157. [{module, emqx_prometheus_schema_validation} | Config];
  158. init_per_group(?PROM_DATA_MODE__NODE, Config) ->
  159. [{mode, ?PROM_DATA_MODE__NODE} | Config];
  160. init_per_group(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Config) ->
  161. [{mode, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED} | Config];
  162. init_per_group(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Config) ->
  163. [{mode, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED} | Config];
  164. init_per_group('text/plain', Config) ->
  165. [{accept, 'text/plain'} | Config];
  166. init_per_group('application/json', Config) ->
  167. [{accept, 'application/json'} | Config];
  168. init_per_group(_Group, Config) ->
  169. Config.
  170. end_per_group(_Group, _Config) ->
  171. ok.
  172. init_per_testcase(t_collect_prom_data, Config) ->
  173. meck:new(emqx_utils, [non_strict, passthrough, no_history, no_link]),
  174. meck:expect(emqx_utils, gen_id, fun() -> "fake" end),
  175. meck:new(emqx, [non_strict, passthrough, no_history, no_link]),
  176. meck:expect(
  177. emqx,
  178. data_dir,
  179. fun() ->
  180. {data_dir, Data} = lists:keyfind(data_dir, 1, Config),
  181. Data
  182. end
  183. ),
  184. Config;
  185. init_per_testcase(_, Config) ->
  186. Config.
  187. end_per_testcase(t_collect_prom_data, _Config) ->
  188. meck:unload(emqx_utils),
  189. meck:unload(emqx),
  190. ok;
  191. end_per_testcase(_, _Config) ->
  192. ok.
  193. %%--------------------------------------------------------------------
  194. %% Cases
  195. %%--------------------------------------------------------------------
  196. t_collect_prom_data(Config) ->
  197. CollectOpts = collect_opts(Config),
  198. Module = ?config(module, Config),
  199. Response = emqx_prometheus_api:collect(Module, CollectOpts),
  200. assert_data(Module, Response, CollectOpts).
  201. %%--------------------------------------------------------------------
  202. %% Helper fns
  203. %%--------------------------------------------------------------------
  204. assert_data(_Module, {Code, Header, RawDataBinary}, #{type := <<"prometheus">>, mode := Mode}) ->
  205. ?assertEqual(Code, 200),
  206. ?assertMatch(#{<<"content-type">> := <<"text/plain">>}, Header),
  207. DataL = lists:filter(
  208. fun(B) ->
  209. case re:run(B, <<"^[^#]">>, [global]) of
  210. {match, _} ->
  211. true;
  212. nomatch ->
  213. false
  214. end
  215. end,
  216. binary:split(RawDataBinary, [<<"\n">>], [global])
  217. ),
  218. assert_prom_data(DataL, Mode);
  219. assert_data(Module, {Code, JsonData}, #{type := <<"json">>, mode := Mode}) ->
  220. ?assertEqual(Code, 200),
  221. ?assertMatch(#{}, JsonData),
  222. assert_json_data(Module, JsonData, Mode).
  223. %%%%%%%%%%%%%%%%%%%%
  224. %% assert text/plain format
  225. assert_prom_data(DataL, Mode) ->
  226. NDataL = lists:map(
  227. fun(Line) ->
  228. binary:split(Line, [<<"{">>, <<",">>, <<"} ">>, <<" ">>], [global])
  229. end,
  230. DataL
  231. ),
  232. do_assert_prom_data(NDataL, Mode).
  233. -define(MGU(K, MAP), maps:get(K, MAP, undefined)).
  234. assert_json_data(_, Data, Mode) ->
  235. lists:foreach(
  236. fun(FunSeed) ->
  237. erlang:apply(?MODULE, fun_name(FunSeed), [?MGU(FunSeed, Data), Mode]),
  238. ok
  239. end,
  240. maps:keys(Data)
  241. ),
  242. ok.
  243. fun_name(Seed) ->
  244. binary_to_atom(<<"assert_json_data__", (atom_to_binary(Seed))/binary>>).
  245. %%--------------------------------------------------------------------
  246. %% Internal Functions
  247. %%--------------------------------------------------------------------
  248. collect_opts(Config) ->
  249. #{
  250. type => accept(?config(accept, Config)),
  251. mode => ?config(mode, Config)
  252. }.
  253. accept('text/plain') ->
  254. <<"prometheus">>;
  255. accept('application/json') ->
  256. <<"json">>.
  257. do_assert_prom_data([], _Mode) ->
  258. ok;
  259. do_assert_prom_data([Metric | RestDataL], Mode) ->
  260. [_MetricNamme | _] = Metric,
  261. assert_stats_metric_labels(Metric, Mode),
  262. do_assert_prom_data(RestDataL, Mode).
  263. assert_stats_metric_labels([MetricName | R] = _Metric, Mode) ->
  264. case maps:get(Mode, metric_meta(MetricName), undefined) of
  265. %% for uncatched metrics (by prometheus.erl)
  266. undefined ->
  267. ok;
  268. N when is_integer(N) ->
  269. case N =:= length(lists:droplast(R)) of
  270. true ->
  271. ok;
  272. false ->
  273. ct:print(
  274. "====================~n"
  275. "%% Metric: ~p~n"
  276. "%% Expect labels count: ~p in Mode: ~p~n"
  277. "%% But got labels: ~p~n",
  278. [_Metric, N, Mode, length(lists:droplast(R))]
  279. )
  280. end,
  281. ?assertEqual(N, length(lists:droplast(R)))
  282. end.
  283. -define(meta(NODE, AGGRE, UNAGGRE), #{
  284. ?PROM_DATA_MODE__NODE => NODE,
  285. ?PROM_DATA_MODE__ALL_NODES_AGGREGATED => AGGRE,
  286. ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED => UNAGGRE
  287. }).
  288. %% `/prometheus/stats`
  289. %% BEGIN always no label
  290. metric_meta(<<"emqx_cluster_sessions_count">>) -> ?meta(0, 0, 0);
  291. metric_meta(<<"emqx_cluster_sessions_max">>) -> ?meta(0, 0, 0);
  292. metric_meta(<<"emqx_topics_max">>) -> ?meta(0, 0, 0);
  293. metric_meta(<<"emqx_topics_count">>) -> ?meta(0, 0, 0);
  294. metric_meta(<<"emqx_retained_count">>) -> ?meta(0, 0, 0);
  295. metric_meta(<<"emqx_retained_max">>) -> ?meta(0, 0, 0);
  296. metric_meta(<<"emqx_subscriptions_shared_count">>) -> ?meta(0, 0, 0);
  297. metric_meta(<<"emqx_subscriptions_shared_max">>) -> ?meta(0, 0, 0);
  298. %% END
  299. %% BEGIN no label in mode `node`
  300. metric_meta(<<"emqx_vm_cpu_use">>) -> ?meta(0, 1, 1);
  301. metric_meta(<<"emqx_vm_cpu_idle">>) -> ?meta(0, 1, 1);
  302. metric_meta(<<"emqx_vm_run_queue">>) -> ?meta(0, 1, 1);
  303. metric_meta(<<"emqx_vm_process_messages_in_queues">>) -> ?meta(0, 1, 1);
  304. metric_meta(<<"emqx_vm_total_memory">>) -> ?meta(0, 1, 1);
  305. metric_meta(<<"emqx_vm_used_memory">>) -> ?meta(0, 1, 1);
  306. metric_meta(<<"emqx_cluster_nodes_running">>) -> ?meta(0, 1, 1);
  307. metric_meta(<<"emqx_cluster_nodes_stopped">>) -> ?meta(0, 1, 1);
  308. %% END
  309. metric_meta(<<"emqx_cert_expiry_at">>) -> ?meta(2, 2, 2);
  310. metric_meta(<<"emqx_license_expiry_at">>) -> ?meta(0, 0, 0);
  311. %% mria metric with label `shard` and `node` when not in mode `node`
  312. metric_meta(<<"emqx_mria_", _Tail/binary>>) -> ?meta(1, 2, 2);
  313. %% `/prometheus/auth`
  314. metric_meta(<<"emqx_authn_users_count">>) -> ?meta(1, 1, 1);
  315. metric_meta(<<"emqx_authn_", _Tail/binary>>) -> ?meta(1, 1, 2);
  316. metric_meta(<<"emqx_authz_rules_count">>) -> ?meta(1, 1, 1);
  317. metric_meta(<<"emqx_authz_", _Tail/binary>>) -> ?meta(1, 1, 2);
  318. metric_meta(<<"emqx_banned_count">>) -> ?meta(0, 0, 0);
  319. %% `/prometheus/data_integration`
  320. metric_meta(<<"emqx_rules_count">>) -> ?meta(0, 0, 0);
  321. metric_meta(<<"emqx_connectors_count">>) -> ?meta(0, 0, 0);
  322. metric_meta(<<"emqx_schema_registrys_count">>) -> ?meta(0, 0, 0);
  323. metric_meta(<<"emqx_rule_", _Tail/binary>>) -> ?meta(1, 1, 2);
  324. metric_meta(<<"emqx_action_", _Tail/binary>>) -> ?meta(1, 1, 2);
  325. metric_meta(<<"emqx_connector_", _Tail/binary>>) -> ?meta(1, 1, 2);
  326. %% `/prometheus/schema_validation`
  327. metric_meta(<<"emqx_schema_validation_", _Tail/binary>>) -> ?meta(1, 1, 2);
  328. %% normal emqx metrics
  329. metric_meta(<<"emqx_", _Tail/binary>>) -> ?meta(0, 0, 1);
  330. metric_meta(_) -> #{}.
  331. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  332. %%% Assert Json Data Structure
  333. assert_json_data__messages(M, Mode) when
  334. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  335. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  336. ->
  337. ?assertMatch(
  338. #{
  339. emqx_messages_received := _,
  340. emqx_messages_sent := _,
  341. emqx_messages_qos0_received := _,
  342. emqx_messages_qos0_sent := _,
  343. emqx_messages_qos1_received := _,
  344. emqx_messages_qos1_sent := _,
  345. emqx_messages_qos2_received := _,
  346. emqx_messages_qos2_sent := _,
  347. emqx_messages_publish := _,
  348. emqx_messages_dropped := _,
  349. emqx_messages_dropped_expired := _,
  350. emqx_messages_dropped_no_subscribers := _,
  351. emqx_messages_forward := _,
  352. emqx_messages_retained := _,
  353. emqx_messages_delayed := _,
  354. emqx_messages_delivered := _,
  355. emqx_messages_acked := _
  356. },
  357. M
  358. ),
  359. ok;
  360. assert_json_data__messages(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when is_list(Ms) ->
  361. eval_foreach_assert(?FUNCTION_NAME, Ms).
  362. assert_json_data__stats(M, Mode) when
  363. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  364. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  365. ->
  366. ?assertMatch(
  367. #{
  368. emqx_connections_count := _,
  369. emqx_connections_max := _,
  370. emqx_durable_subscriptions_count := _,
  371. emqx_durable_subscriptions_max := _,
  372. emqx_live_connections_count := _,
  373. emqx_live_connections_max := _,
  374. emqx_sessions_count := _,
  375. emqx_sessions_max := _,
  376. emqx_channels_count := _,
  377. emqx_channels_max := _,
  378. emqx_topics_count := _,
  379. emqx_topics_max := _,
  380. emqx_suboptions_count := _,
  381. emqx_suboptions_max := _,
  382. emqx_subscribers_count := _,
  383. emqx_subscribers_max := _,
  384. emqx_subscriptions_count := _,
  385. emqx_subscriptions_max := _,
  386. emqx_subscriptions_shared_count := _,
  387. emqx_subscriptions_shared_max := _,
  388. emqx_retained_count := _,
  389. emqx_retained_max := _,
  390. emqx_delayed_count := _,
  391. emqx_delayed_max := _
  392. },
  393. M
  394. );
  395. assert_json_data__stats(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when is_list(Ms) ->
  396. eval_foreach_assert(?FUNCTION_NAME, Ms).
  397. assert_json_data__olp(M, Mode) when
  398. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  399. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  400. ->
  401. ?assertMatch(#{}, M);
  402. assert_json_data__olp(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when is_list(Ms) ->
  403. ok.
  404. assert_json_data__client(M, Mode) when
  405. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  406. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  407. ->
  408. ?assertMatch(
  409. #{
  410. emqx_client_connect := _,
  411. emqx_client_connack := _,
  412. emqx_client_connected := _,
  413. emqx_client_authenticate := _,
  414. emqx_client_auth_anonymous := _,
  415. emqx_client_authorize := _,
  416. emqx_client_subscribe := _,
  417. emqx_client_unsubscribe := _,
  418. emqx_client_disconnected := _
  419. },
  420. M
  421. );
  422. assert_json_data__client(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when is_list(Ms) ->
  423. eval_foreach_assert(?FUNCTION_NAME, Ms).
  424. assert_json_data__session(M, Mode) when
  425. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  426. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  427. ->
  428. ?assertMatch(
  429. #{
  430. emqx_session_created := _,
  431. emqx_session_resumed := _,
  432. emqx_session_takenover := _,
  433. emqx_session_discarded := _,
  434. emqx_session_terminated := _
  435. },
  436. M
  437. );
  438. assert_json_data__session(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when is_list(Ms) ->
  439. eval_foreach_assert(?FUNCTION_NAME, Ms).
  440. assert_json_data__metrics(M, ?PROM_DATA_MODE__NODE) ->
  441. ?assertMatch(
  442. #{
  443. emqx_vm_cpu_use := _,
  444. emqx_vm_cpu_idle := _,
  445. emqx_vm_run_queue := _,
  446. emqx_vm_process_messages_in_queues := _,
  447. emqx_vm_total_memory := _,
  448. emqx_vm_used_memory := _
  449. },
  450. M
  451. );
  452. assert_json_data__metrics(Ms, Mode) when
  453. is_list(Ms) andalso
  454. (Mode =:= ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED orelse
  455. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  456. ->
  457. eval_foreach_assert(?FUNCTION_NAME, Ms).
  458. assert_json_data__delivery(M, Mode) when
  459. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  460. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  461. ->
  462. ?assertMatch(
  463. #{
  464. emqx_delivery_dropped := _,
  465. emqx_delivery_dropped_no_local := _,
  466. emqx_delivery_dropped_too_large := _,
  467. emqx_delivery_dropped_qos0_msg := _,
  468. emqx_delivery_dropped_queue_full := _,
  469. emqx_delivery_dropped_expired := _
  470. },
  471. M
  472. );
  473. assert_json_data__delivery(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when
  474. is_list(Ms)
  475. ->
  476. eval_foreach_assert(?FUNCTION_NAME, Ms).
  477. assert_json_data__cluster(M, Mode) when
  478. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  479. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  480. ->
  481. ?assertMatch(
  482. #{emqx_cluster_nodes_running := _, emqx_cluster_nodes_stopped := _},
  483. M
  484. );
  485. assert_json_data__cluster(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when
  486. is_list(Ms)
  487. ->
  488. eval_foreach_assert(?FUNCTION_NAME, Ms).
  489. assert_json_data__acl(M, Mode) when
  490. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  491. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  492. ->
  493. ?assertMatch(
  494. #{
  495. emqx_authorization_allow := _,
  496. emqx_authorization_deny := _,
  497. emqx_authorization_cache_hit := _,
  498. emqx_authorization_cache_miss := _,
  499. emqx_authorization_superuser := _,
  500. emqx_authorization_nomatch := _,
  501. emqx_authorization_matched_allow := _,
  502. emqx_authorization_matched_deny := _
  503. },
  504. M
  505. );
  506. assert_json_data__acl(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when
  507. is_list(Ms)
  508. ->
  509. eval_foreach_assert(?FUNCTION_NAME, Ms).
  510. assert_json_data__authn(M, Mode) when
  511. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  512. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  513. ->
  514. ?assertMatch(
  515. #{
  516. emqx_authentication_success := _,
  517. emqx_authentication_success_anonymous := _,
  518. emqx_authentication_failure := _
  519. },
  520. M
  521. );
  522. assert_json_data__authn(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when
  523. is_list(Ms)
  524. ->
  525. eval_foreach_assert(?FUNCTION_NAME, Ms).
  526. assert_json_data__packets(M, Mode) when
  527. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  528. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  529. ->
  530. ?assertMatch(
  531. #{
  532. emqx_packets_publish_auth_error := _,
  533. emqx_packets_puback_received := _,
  534. emqx_packets_pubcomp_inuse := _,
  535. emqx_packets_pubcomp_sent := _,
  536. emqx_packets_suback_sent := _,
  537. emqx_packets_pubrel_missed := _,
  538. emqx_packets_publish_inuse := _,
  539. emqx_packets_pingresp_sent := _,
  540. emqx_packets_subscribe_received := _,
  541. emqx_bytes_received := _,
  542. emqx_packets_publish_dropped := _,
  543. emqx_packets_publish_received := _,
  544. emqx_packets_connack_sent := _,
  545. emqx_packets_connack_auth_error := _,
  546. emqx_packets_pubrec_inuse := _,
  547. emqx_packets_sent := _,
  548. emqx_packets_puback_sent := _,
  549. emqx_packets_received := _,
  550. emqx_packets_pubrec_missed := _,
  551. emqx_packets_unsubscribe_received := _,
  552. emqx_packets_puback_inuse := _,
  553. emqx_packets_publish_sent := _,
  554. emqx_packets_pubrec_sent := _,
  555. emqx_packets_pubcomp_received := _,
  556. emqx_packets_disconnect_sent := _,
  557. emqx_packets_unsuback_sent := _,
  558. emqx_bytes_sent := _,
  559. emqx_packets_unsubscribe_error := _,
  560. emqx_packets_auth_received := _,
  561. emqx_packets_subscribe_auth_error := _,
  562. emqx_packets_puback_missed := _,
  563. emqx_packets_publish_error := _,
  564. emqx_packets_subscribe_error := _,
  565. emqx_packets_disconnect_received := _,
  566. emqx_packets_pingreq_received := _,
  567. emqx_packets_pubrel_received := _,
  568. emqx_packets_pubcomp_missed := _,
  569. emqx_packets_pubrec_received := _,
  570. emqx_packets_connack_error := _,
  571. emqx_packets_auth_sent := _,
  572. emqx_packets_pubrel_sent := _,
  573. emqx_packets_connect := _
  574. },
  575. M
  576. );
  577. assert_json_data__packets(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when
  578. is_list(Ms)
  579. ->
  580. eval_foreach_assert(?FUNCTION_NAME, Ms).
  581. %% certs always return json list
  582. assert_json_data__certs(Ms, _) ->
  583. lists:foreach(
  584. fun(M) ->
  585. ?assertMatch(
  586. #{
  587. emqx_cert_expiry_at := _,
  588. listener_type := _,
  589. listener_name := _
  590. },
  591. M
  592. )
  593. end,
  594. Ms
  595. ).
  596. eval_foreach_assert(FunctionName, Ms) ->
  597. Fun = fun() ->
  598. ok = lists:foreach(
  599. fun(M) -> erlang:apply(?MODULE, FunctionName, [M, ?PROM_DATA_MODE__NODE]) end, Ms
  600. ),
  601. ok = lists:foreach(fun(M) -> ?assertMatch(#{node := _}, M) end, Ms)
  602. end,
  603. Fun().
  604. -if(?EMQX_RELEASE_EDITION == ee).
  605. %% license always map
  606. assert_json_data__license(M, _) ->
  607. ?assertMatch(#{emqx_license_expiry_at := _}, M).
  608. -else.
  609. -endif.
  610. -define(assert_node_foreach(Ms), lists:foreach(fun(M) -> ?assertMatch(#{node := _}, M) end, Ms)).
  611. assert_json_data__emqx_banned(M, _) ->
  612. ?assertMatch(#{emqx_banned_count := _}, M).
  613. assert_json_data__emqx_authn(Ms, Mode) when
  614. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  615. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  616. ->
  617. lists:foreach(
  618. fun(M) ->
  619. ?assertMatch(
  620. #{
  621. id := _,
  622. emqx_authn_enable := _,
  623. emqx_authn_failed := _,
  624. emqx_authn_nomatch := _,
  625. emqx_authn_status := _,
  626. emqx_authn_success := _,
  627. emqx_authn_total := _,
  628. emqx_authn_users_count := _
  629. },
  630. M
  631. )
  632. end,
  633. Ms
  634. );
  635. assert_json_data__emqx_authn(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) ->
  636. ?assert_node_foreach(Ms).
  637. assert_json_data__emqx_authz(Ms, Mode) when
  638. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  639. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  640. ->
  641. lists:foreach(
  642. fun(M) ->
  643. ?assertMatch(
  644. #{
  645. type := _,
  646. emqx_authz_allow := _,
  647. emqx_authz_deny := _,
  648. emqx_authz_enable := _,
  649. emqx_authz_nomatch := _,
  650. emqx_authz_rules_count := _,
  651. emqx_authz_status := _,
  652. emqx_authz_total := _
  653. },
  654. M
  655. )
  656. end,
  657. Ms
  658. );
  659. assert_json_data__emqx_authz(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) ->
  660. ?assert_node_foreach(Ms).
  661. assert_json_data__rules(Ms, Mode) when
  662. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  663. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  664. ->
  665. lists:foreach(
  666. fun(M) ->
  667. ?assertMatch(
  668. #{
  669. id := _,
  670. emqx_rule_actions_failed := _,
  671. emqx_rule_actions_failed_out_of_service := _,
  672. emqx_rule_actions_failed_unknown := _,
  673. emqx_rule_actions_success := _,
  674. emqx_rule_actions_total := _,
  675. emqx_rule_enable := _,
  676. emqx_rule_failed := _,
  677. emqx_rule_failed_exception := _,
  678. emqx_rule_failed_no_result := _,
  679. emqx_rule_matched := _,
  680. emqx_rule_passed := _
  681. },
  682. M
  683. )
  684. end,
  685. Ms
  686. );
  687. assert_json_data__rules(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when
  688. is_list(Ms)
  689. ->
  690. ?assert_node_foreach(Ms).
  691. assert_json_data__actions(Ms, Mode) when
  692. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  693. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  694. ->
  695. lists:foreach(
  696. fun(M) ->
  697. ?assertMatch(
  698. #{
  699. id := _,
  700. emqx_action_dropped := _,
  701. emqx_action_dropped_expired := _,
  702. emqx_action_dropped_other := _,
  703. emqx_action_dropped_queue_full := _,
  704. emqx_action_dropped_resource_not_found := _,
  705. emqx_action_dropped_resource_stopped := _,
  706. emqx_action_enable := _,
  707. emqx_action_failed := _,
  708. emqx_action_inflight := _,
  709. emqx_action_late_reply := _,
  710. emqx_action_matched := _,
  711. emqx_action_queuing := _,
  712. emqx_action_received := _,
  713. emqx_action_retried := _,
  714. emqx_action_retried_failed := _,
  715. emqx_action_retried_success := _,
  716. emqx_action_status := _,
  717. emqx_action_success := _
  718. },
  719. M
  720. )
  721. end,
  722. Ms
  723. );
  724. assert_json_data__actions(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when
  725. is_list(Ms)
  726. ->
  727. ?assert_node_foreach(Ms).
  728. assert_json_data__connectors(Ms, Mode) when
  729. (Mode =:= ?PROM_DATA_MODE__NODE orelse
  730. Mode =:= ?PROM_DATA_MODE__ALL_NODES_AGGREGATED)
  731. ->
  732. lists:foreach(
  733. fun(M) ->
  734. ?assertMatch(
  735. #{
  736. id := _,
  737. emqx_connector_enable := _,
  738. emqx_connector_status := _
  739. },
  740. M
  741. )
  742. end,
  743. Ms
  744. );
  745. assert_json_data__connectors(Ms, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) when
  746. is_list(Ms)
  747. ->
  748. ?assert_node_foreach(Ms).
  749. -if(?EMQX_RELEASE_EDITION == ee).
  750. assert_json_data__data_integration_overview(M, _) ->
  751. ?assertMatch(
  752. #{
  753. emqx_connectors_count := _,
  754. emqx_rules_count := _,
  755. emqx_schema_registrys_count := _
  756. },
  757. M
  758. ).
  759. -else.
  760. assert_json_data__data_integration_overview(M, _) ->
  761. ?assertMatch(
  762. #{
  763. emqx_connectors_count := _,
  764. emqx_rules_count := _
  765. },
  766. M
  767. ).
  768. -endif.
  769. assert_json_data__schema_validations(Ms, _) ->
  770. lists:foreach(
  771. fun(M) ->
  772. ?assertMatch(
  773. #{
  774. validation_name := _,
  775. emqx_schema_validation_enable := _,
  776. emqx_schema_validation_matched := _,
  777. emqx_schema_validation_failed := _,
  778. emqx_schema_validation_succeeded := _
  779. },
  780. M
  781. )
  782. end,
  783. Ms
  784. ).
  785. schema_validation_config() ->
  786. Validation = #{
  787. <<"enable">> => true,
  788. <<"name">> => <<"my_validation">>,
  789. <<"topics">> => [<<"t/#">>],
  790. <<"strategy">> => <<"all_pass">>,
  791. <<"failure_action">> => <<"drop">>,
  792. <<"checks">> => [
  793. #{
  794. <<"type">> => <<"sql">>,
  795. <<"sql">> => <<"select * where true">>
  796. }
  797. ]
  798. },
  799. #{
  800. <<"schema_validation">> => #{
  801. <<"validations">> => [Validation]
  802. }
  803. }.
  804. stop_apps(Apps) ->
  805. lists:foreach(fun application:stop/1, Apps).