emqx_sys_mon_SUITE.erl 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_sys_mon_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx_mqtt.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -include_lib("common_test/include/ct.hrl").
  22. -define(SYSMON, emqx_sys_mon).
  23. -define(FAKE_PORT, hd(erlang:ports())).
  24. -define(FAKE_INFO, [{timeout, 100}, {in, foo}, {out, {?MODULE, bar, 1}}]).
  25. -define(INPUTINFO, [
  26. {self(), long_gc, fmt("long_gc warning: pid = ~p", [self()]), ?FAKE_INFO},
  27. {self(), long_schedule, fmt("long_schedule warning: pid = ~p", [self()]), ?FAKE_INFO},
  28. {self(), large_heap, fmt("large_heap warning: pid = ~p", [self()]), ?FAKE_INFO},
  29. {
  30. self(),
  31. busy_port,
  32. fmt(
  33. "busy_port warning: suspid = ~p, port = ~p",
  34. [self(), ?FAKE_PORT]
  35. ),
  36. ?FAKE_PORT
  37. },
  38. %% for the case when the port is missing, for some
  39. %% reason.
  40. {
  41. self(),
  42. busy_port,
  43. fmt(
  44. "busy_port warning: suspid = ~p, port = ~p",
  45. [self(), []]
  46. ),
  47. []
  48. },
  49. {
  50. self(),
  51. busy_dist_port,
  52. fmt(
  53. "busy_dist_port warning: suspid = ~p, port = ~p",
  54. [self(), ?FAKE_PORT]
  55. ),
  56. ?FAKE_PORT
  57. },
  58. {?FAKE_PORT, long_schedule, fmt("long_schedule warning: port = ~p", [?FAKE_PORT]), ?FAKE_INFO}
  59. ]).
  60. all() -> emqx_common_test_helpers:all(?MODULE).
  61. init_per_testcase(t_sys_mon = TestCase, Config) ->
  62. Apps = emqx_cth_suite:start(
  63. [
  64. {emqx, #{
  65. override_env => [
  66. {sys_mon, [
  67. {busy_dist_port, true},
  68. {busy_port, false},
  69. {large_heap, 8388608},
  70. {long_schedule, 240},
  71. {long_gc, 0}
  72. ]}
  73. ]
  74. }}
  75. ],
  76. #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
  77. ),
  78. [{apps, Apps} | Config];
  79. init_per_testcase(t_sys_mon2 = TestCase, Config) ->
  80. Apps = emqx_cth_suite:start(
  81. [
  82. {emqx, #{
  83. override_env => [
  84. {sys_mon, [
  85. {busy_dist_port, false},
  86. {busy_port, true},
  87. {large_heap, 8388608},
  88. {long_schedule, 0},
  89. {long_gc, 200},
  90. {nothing, 0}
  91. ]}
  92. ]
  93. }}
  94. ],
  95. #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
  96. ),
  97. [{apps, Apps} | Config];
  98. init_per_testcase(t_procinfo = TestCase, Config) ->
  99. Apps = emqx_cth_suite:start(
  100. [emqx],
  101. #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
  102. ),
  103. ok = meck:new(emqx_vm, [passthrough, no_history]),
  104. [{apps, Apps} | Config];
  105. init_per_testcase(TestCase, Config) ->
  106. Apps = emqx_cth_suite:start(
  107. [emqx],
  108. #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
  109. ),
  110. [{apps, Apps} | Config].
  111. end_per_testcase(t_procinfo, Config) ->
  112. Apps = ?config(apps, Config),
  113. ok = meck:unload(emqx_vm),
  114. ok = emqx_cth_suite:stop(Apps),
  115. ok;
  116. end_per_testcase(_, Config) ->
  117. Apps = ?config(apps, Config),
  118. ok = emqx_cth_suite:stop(Apps),
  119. ok.
  120. t_procinfo(_) ->
  121. ok = meck:expect(emqx_vm, get_process_info, fun(_) -> [] end),
  122. ok = meck:expect(emqx_vm, get_process_gc_info, fun(_) -> undefined end),
  123. ?assertEqual([{pid, self()}], emqx_sys_mon:procinfo(self())).
  124. t_procinfo_initial_call_and_stacktrace(_) ->
  125. SomePid = proc_lib:spawn(?MODULE, some_function, [self(), arg2]),
  126. receive
  127. {spawned, SomePid} ->
  128. ok
  129. after 100 ->
  130. error(process_not_spawned)
  131. end,
  132. ProcInfo = emqx_sys_mon:procinfo(SomePid),
  133. ?assertEqual(
  134. {?MODULE, some_function, ['Argument__1', 'Argument__2']},
  135. proplists:get_value(proc_lib_initial_call, ProcInfo)
  136. ),
  137. ?assertMatch(
  138. [
  139. {?MODULE, some_function, 2, [
  140. {file, _},
  141. {line, _}
  142. ]},
  143. {proc_lib, init_p_do_apply, 3, [
  144. {file, _},
  145. {line, _}
  146. ]}
  147. ],
  148. proplists:get_value(current_stacktrace, ProcInfo)
  149. ),
  150. SomePid ! stop.
  151. t_sys_mon(_Config) ->
  152. lists:foreach(
  153. fun({PidOrPort, SysMonName, ValidateInfo, InfoOrPort}) ->
  154. validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort)
  155. end,
  156. ?INPUTINFO
  157. ).
  158. %% Existing port, but closed.
  159. t_sys_mon_dead_port(_Config) ->
  160. process_flag(trap_exit, true),
  161. Port = dead_port(),
  162. {PidOrPort, SysMonName, ValidateInfo, InfoOrPort} =
  163. {
  164. self(),
  165. busy_port,
  166. fmt(
  167. "busy_port warning: suspid = ~p, port = ~p",
  168. [self(), Port]
  169. ),
  170. Port
  171. },
  172. validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort).
  173. t_sys_mon2(_Config) ->
  174. ?SYSMON ! {timeout, ignored, reset},
  175. ?SYSMON ! {ignored},
  176. ?assertEqual(ignored, gen_server:call(?SYSMON, ignored)),
  177. ?assertEqual(ok, gen_server:cast(?SYSMON, ignored)),
  178. gen_server:stop(?SYSMON).
  179. validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort) ->
  180. {ok, C} = emqtt:start_link([{host, "localhost"}]),
  181. {ok, _} = emqtt:connect(C),
  182. emqtt:subscribe(C, emqx_topic:systop(lists:concat(['sysmon/', SysMonName])), qos1),
  183. timer:sleep(100),
  184. ?SYSMON ! {monitor, PidOrPort, SysMonName, InfoOrPort},
  185. receive
  186. {publish, #{payload := Info}} ->
  187. ?assertEqual(ValidateInfo, binary_to_list(Info)),
  188. ct:pal("OK - received msg: ~p~n", [Info])
  189. after 1000 ->
  190. ct:fail(timeout)
  191. end,
  192. emqtt:stop(C).
  193. fmt(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
  194. some_function(Parent, _Arg2) ->
  195. Parent ! {spawned, self()},
  196. receive
  197. stop ->
  198. ok
  199. end.
  200. dead_port() ->
  201. Port = erlang:open_port({spawn, "ls"}, []),
  202. exit(Port, kill),
  203. Port.