emqx_topic_metrics_api_SUITE.erl 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_topic_metrics_api_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1]).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("common_test/include/ct.hrl").
  21. -define(BASE_CONF, #{
  22. <<"topic_metrics">> => []
  23. }).
  24. suite() -> [{timetrap, {seconds, 30}}].
  25. all() ->
  26. emqx_common_test_helpers:all(?MODULE).
  27. init_per_testcase(_, Config) ->
  28. lists:foreach(
  29. fun emqx_modules_conf:remove_topic_metrics/1,
  30. emqx_modules_conf:topic_metrics()
  31. ),
  32. Config.
  33. init_per_suite(Config) ->
  34. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
  35. ok = emqx_mgmt_api_test_util:init_suite(
  36. [emqx_conf, emqx_modules]
  37. ),
  38. %% When many tests run in an obscure order, it may occur that
  39. %% `gen_rpc` started with its default settings before `emqx_conf`.
  40. %% `gen_rpc` and `emqx_conf` have different default `port_discovery` modes,
  41. %% so we reinitialize `gen_rpc` explicitly.
  42. ok = application:stop(gen_rpc),
  43. ok = application:start(gen_rpc),
  44. Config.
  45. end_per_suite(_Config) ->
  46. emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_modules]),
  47. application:stop(gen_rpc),
  48. ok.
  49. %%------------------------------------------------------------------------------
  50. %% Tests
  51. %%------------------------------------------------------------------------------
  52. t_mqtt_topic_metrics_collection(_) ->
  53. {ok, 200, Result0} = request(
  54. get,
  55. uri(["mqtt", "topic_metrics"])
  56. ),
  57. ?assertEqual(
  58. [],
  59. emqx_utils_json:decode(Result0)
  60. ),
  61. {ok, 200, _} = request(
  62. post,
  63. uri(["mqtt", "topic_metrics"]),
  64. #{<<"topic">> => <<"topic/1/2">>}
  65. ),
  66. {ok, 200, Result1} = request(
  67. get,
  68. uri(["mqtt", "topic_metrics"])
  69. ),
  70. ?assertMatch(
  71. [
  72. #{
  73. <<"topic">> := <<"topic/1/2">>,
  74. <<"metrics">> := #{}
  75. }
  76. ],
  77. emqx_utils_json:decode(Result1)
  78. ),
  79. ?assertMatch(
  80. {ok, 200, _},
  81. request(
  82. put,
  83. uri(["mqtt", "topic_metrics"]),
  84. #{
  85. <<"topic">> => <<"topic/1/2">>,
  86. <<"action">> => <<"reset">>
  87. }
  88. )
  89. ),
  90. ?assertMatch(
  91. {ok, 200, _},
  92. request(
  93. put,
  94. uri(["mqtt", "topic_metrics"]),
  95. #{<<"action">> => <<"reset">>}
  96. )
  97. ),
  98. ?assertMatch(
  99. {ok, 404, _},
  100. request(
  101. put,
  102. uri(["mqtt", "topic_metrics"]),
  103. #{
  104. <<"topic">> => <<"unknown_topic/1/2">>,
  105. <<"action">> => <<"reset">>
  106. }
  107. )
  108. ),
  109. ?assertMatch(
  110. {ok, 204, _},
  111. request(
  112. delete,
  113. uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
  114. )
  115. ).
  116. t_mqtt_topic_metrics(_) ->
  117. {ok, 200, _} = request(
  118. post,
  119. uri(["mqtt", "topic_metrics"]),
  120. #{<<"topic">> => <<"topic/1/2">>}
  121. ),
  122. {ok, 200, Result0} = request(
  123. get,
  124. uri(["mqtt", "topic_metrics"])
  125. ),
  126. ?assertMatch([_], emqx_utils_json:decode(Result0)),
  127. {ok, 200, Result1} = request(
  128. get,
  129. uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
  130. ),
  131. ?assertMatch(
  132. #{
  133. <<"topic">> := <<"topic/1/2">>,
  134. <<"metrics">> := #{}
  135. },
  136. emqx_utils_json:decode(Result1)
  137. ),
  138. ?assertMatch(
  139. {ok, 204, _},
  140. request(
  141. delete,
  142. uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
  143. )
  144. ),
  145. ?assertMatch(
  146. {ok, 404, _},
  147. request(
  148. get,
  149. uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
  150. )
  151. ),
  152. ?assertMatch(
  153. {ok, 404, _},
  154. request(
  155. delete,
  156. uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
  157. )
  158. ).
  159. t_bad_reqs(_) ->
  160. %% empty topic
  161. ?assertMatch(
  162. {ok, 400, _},
  163. request(
  164. post,
  165. uri(["mqtt", "topic_metrics"]),
  166. #{<<"topic">> => <<"">>}
  167. )
  168. ),
  169. %% wildcard
  170. ?assertMatch(
  171. {ok, 400, _},
  172. request(
  173. post,
  174. uri(["mqtt", "topic_metrics"]),
  175. #{<<"topic">> => <<"foo/+/bar">>}
  176. )
  177. ),
  178. {ok, 200, _} = request(
  179. post,
  180. uri(["mqtt", "topic_metrics"]),
  181. #{<<"topic">> => <<"topic/1/2">>}
  182. ),
  183. %% existing topic
  184. ?assertMatch(
  185. {ok, 400, _},
  186. request(
  187. post,
  188. uri(["mqtt", "topic_metrics"]),
  189. #{<<"topic">> => <<"topic/1/2">>}
  190. )
  191. ),
  192. ok = emqx_modules_conf:remove_topic_metrics(<<"topic/1/2">>),
  193. %% limit
  194. Responses = lists:map(
  195. fun(N) ->
  196. Topic = iolist_to_binary([
  197. <<"topic/">>,
  198. integer_to_binary(N)
  199. ]),
  200. request(
  201. post,
  202. uri(["mqtt", "topic_metrics"]),
  203. #{<<"topic">> => Topic}
  204. )
  205. end,
  206. lists:seq(1, 513)
  207. ),
  208. ?assertMatch(
  209. [{ok, 409, _}, {ok, 200, _} | _],
  210. lists:reverse(Responses)
  211. ),
  212. %% limit && wildcard
  213. ?assertMatch(
  214. {ok, 400, _},
  215. request(
  216. post,
  217. uri(["mqtt", "topic_metrics"]),
  218. #{<<"topic">> => <<"a/+">>}
  219. )
  220. ).
  221. t_node_aggregation(_) ->
  222. TwoNodeResult = [
  223. #{
  224. create_time => <<"2022-03-30T13:54:10+03:00">>,
  225. metrics => #{'messages.dropped.count' => 1},
  226. reset_time => <<"2022-03-30T13:54:10+03:00">>,
  227. topic => <<"topic/1/2">>
  228. },
  229. #{
  230. create_time => <<"2022-03-30T13:54:10+03:00">>,
  231. metrics => #{'messages.dropped.count' => 2},
  232. reset_time => <<"2022-03-30T13:54:10+03:00">>,
  233. topic => <<"topic/1/2">>
  234. }
  235. ],
  236. meck:new(emqx_topic_metrics_proto_v1, [passthrough]),
  237. meck:expect(emqx_topic_metrics_proto_v1, metrics, 2, {TwoNodeResult, []}),
  238. {ok, 200, Result} = request(
  239. get,
  240. uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
  241. ),
  242. ?assertMatch(
  243. #{
  244. <<"topic">> := <<"topic/1/2">>,
  245. <<"metrics">> := #{<<"messages.dropped.count">> := 3}
  246. },
  247. emqx_utils_json:decode(Result)
  248. ),
  249. meck:unload(emqx_topic_metrics_proto_v1).
  250. t_badrpc(_) ->
  251. meck:new(emqx_topic_metrics_proto_v1, [passthrough]),
  252. meck:expect(emqx_topic_metrics_proto_v1, metrics, 2, {[], [node()]}),
  253. ?assertMatch(
  254. {ok, 500, _},
  255. request(
  256. get,
  257. uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
  258. )
  259. ),
  260. meck:unload(emqx_topic_metrics_proto_v1).
  261. %%------------------------------------------------------------------------------
  262. %% Helpers
  263. %%------------------------------------------------------------------------------