emqx_topic_metrics_SUITE.erl 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_topic_metrics_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -define(TOPIC, #{<<"topic_metrics">> => []}).
  20. -include_lib("eunit/include/eunit.hrl").
  21. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  22. all() -> emqx_common_test_helpers:all(?MODULE).
  23. init_per_suite(Config) ->
  24. emqx_common_test_helpers:boot_modules(all),
  25. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?TOPIC),
  26. emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
  27. Config.
  28. end_per_suite(_Config) ->
  29. emqx_common_test_helpers:stop_apps([emqx_modules, emqx_conf]).
  30. init_per_testcase(_Case, Config) ->
  31. emqx_topic_metrics:enable(),
  32. emqx_topic_metrics:deregister_all(),
  33. Config.
  34. end_per_testcase(t_metrics_not_started, _Config) ->
  35. _ = supervisor:restart_child(emqx_modules_sup, emqx_topic_metrics),
  36. ok;
  37. end_per_testcase(_Case, _Config) ->
  38. emqx_topic_metrics:deregister_all(),
  39. emqx_config:put([topic_metrics], []),
  40. emqx_topic_metrics:disable().
  41. t_nonexistent_topic_metrics(_) ->
  42. ?assertEqual({error, topic_not_found}, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
  43. ?assertEqual({error, topic_not_found}, emqx_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
  44. ?assertEqual({error, topic_not_found}, emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in')),
  45. % ?assertEqual({error, topic_not_found}, emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in')),
  46. emqx_topic_metrics:register(<<"a/b/c">>),
  47. ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
  48. ?assertEqual({error, invalid_metric}, emqx_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')),
  49. ?assertEqual({error, invalid_metric}, emqx_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')),
  50. ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')),
  51. %% ?assertEqual(
  52. %% {error, invalid_metric},
  53. %% emqx_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')
  54. %% ),
  55. ok = emqx_topic_metrics:deregister(<<"a/b/c">>).
  56. t_topic_metrics(_) ->
  57. ?assertEqual(false, emqx_topic_metrics:is_registered(<<"a/b/c">>)),
  58. ?assertEqual([], emqx_topic_metrics:all_registered_topics()),
  59. emqx_topic_metrics:register(<<"a/b/c">>),
  60. ?assertEqual(true, emqx_topic_metrics:is_registered(<<"a/b/c">>)),
  61. ?assertEqual([<<"a/b/c">>], emqx_topic_metrics:all_registered_topics()),
  62. ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
  63. ?assertEqual(ok, emqx_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
  64. ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
  65. ?assert(emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0),
  66. %% ?assert(
  67. %% emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:=
  68. %% #{long => 0, medium => 0, short => 0}
  69. %% ),
  70. ok = emqx_topic_metrics:deregister(<<"a/b/c">>).
  71. t_hook(_) ->
  72. emqx_topic_metrics:register(<<"a/b/c">>),
  73. ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
  74. ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
  75. ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.out')),
  76. ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
  77. ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
  78. {ok, C} = emqtt:start_link([
  79. {host, "localhost"},
  80. {clientid, "myclient"},
  81. {username, "myuser"}
  82. ]),
  83. {ok, _} = emqtt:connect(C),
  84. emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 0}]),
  85. emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 1}]),
  86. emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 2}]),
  87. ct:sleep(100),
  88. ?assertEqual(3, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
  89. ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
  90. ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos1.in')),
  91. ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos2.in')),
  92. ?assertEqual(3, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
  93. emqtt:subscribe(C, <<"a/b/c">>, [{qos, 2}]),
  94. ct:sleep(100),
  95. emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 0}]),
  96. emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 1}]),
  97. emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 2}]),
  98. ct:sleep(100),
  99. ?assertEqual(6, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
  100. ?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
  101. ?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos1.in')),
  102. ?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos2.in')),
  103. ?assertEqual(3, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.out')),
  104. ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
  105. ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos1.out')),
  106. ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos2.out')),
  107. ?assertEqual(3, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
  108. ok = emqx_topic_metrics:deregister(<<"a/b/c">>).
  109. t_topic_server_restart(_) ->
  110. emqx_config:put([topic_metrics], [#{topic => <<"a/b/c">>}]),
  111. ?check_trace(
  112. begin
  113. ?wait_async_action(
  114. erlang:exit(whereis(emqx_topic_metrics), kill),
  115. #{?snk_kind := emqx_topic_metrics_started},
  116. 500
  117. )
  118. end,
  119. fun(Trace) ->
  120. ?assertMatch(
  121. [_ | _],
  122. ?of_kind(emqx_topic_metrics_started, Trace)
  123. )
  124. end
  125. ),
  126. ?assertEqual(
  127. [<<"a/b/c">>],
  128. emqx_topic_metrics:all_registered_topics()
  129. ).
  130. t_unknown_messages(_) ->
  131. OldPid = whereis(emqx_topic_metrics),
  132. ?check_trace(
  133. begin
  134. ?wait_async_action(
  135. OldPid ! unknown,
  136. #{?snk_kind := emqx_topic_metrics_unexpected_info},
  137. 500
  138. ),
  139. ?wait_async_action(
  140. gen_server:cast(OldPid, unknown),
  141. #{?snk_kind := emqx_topic_metrics_unexpected_cast},
  142. 500
  143. )
  144. end,
  145. fun(Trace) ->
  146. ?assertMatch(
  147. [_ | _],
  148. ?of_kind(emqx_topic_metrics_unexpected_info, Trace)
  149. ),
  150. ?assertMatch(
  151. [_ | _],
  152. ?of_kind(emqx_topic_metrics_unexpected_cast, Trace)
  153. )
  154. end
  155. ),
  156. %% emqx_topic_metrics did not crash from unexpected calls
  157. ?assertEqual(
  158. OldPid,
  159. whereis(emqx_topic_metrics)
  160. ).
  161. t_metrics_not_started(_Config) ->
  162. _ = emqx_topic_metrics:register(<<"a/b/c">>),
  163. ?assert(emqx_topic_metrics:is_registered(<<"a/b/c">>)),
  164. ok = supervisor:terminate_child(emqx_modules_sup, emqx_topic_metrics),
  165. ?assertNot(emqx_topic_metrics:is_registered(<<"a/b/c">>)),
  166. {ok, _} = supervisor:restart_child(emqx_modules_sup, emqx_topic_metrics).