emqx_utils_SUITE.erl 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include("../../emqx/include/asserts.hrl").
  21. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  22. -define(SOCKOPTS, [
  23. binary,
  24. {packet, raw},
  25. {reuseaddr, true},
  26. {backlog, 512},
  27. {nodelay, true}
  28. ]).
  29. all() -> emqx_common_test_helpers:all(?MODULE).
  30. t_merge_opts(_) ->
  31. Opts = emqx_utils:merge_opts(?SOCKOPTS, [
  32. raw,
  33. binary,
  34. {backlog, 1024},
  35. {nodelay, false},
  36. {max_clients, 1024},
  37. {acceptors, 16}
  38. ]),
  39. ?assertEqual(1024, proplists:get_value(backlog, Opts)),
  40. ?assertEqual(1024, proplists:get_value(max_clients, Opts)),
  41. ?assertEqual(
  42. [
  43. binary,
  44. raw,
  45. {acceptors, 16},
  46. {backlog, 1024},
  47. {max_clients, 1024},
  48. {nodelay, false},
  49. {packet, raw},
  50. {reuseaddr, true}
  51. ],
  52. lists:sort(Opts)
  53. ).
  54. t_maybe_apply(_) ->
  55. ?assertEqual(undefined, emqx_utils:maybe_apply(fun(A) -> A end, undefined)),
  56. ?assertEqual(a, emqx_utils:maybe_apply(fun(A) -> A end, a)).
  57. t_run_fold(_) ->
  58. ?assertEqual(1, emqx_utils:run_fold([], 1, state)),
  59. Add = fun(I, St) -> I + St end,
  60. Mul = fun(I, St) -> I * St end,
  61. ?assertEqual(6, emqx_utils:run_fold([Add, Mul], 1, 2)).
  62. t_pipeline(_) ->
  63. ?assertEqual({ok, input, state}, emqx_utils:pipeline([], input, state)),
  64. Funs = [
  65. fun(_I, _St) -> ok end,
  66. fun(_I, St) -> {ok, St + 1} end,
  67. fun(I, St) -> {ok, I + 1, St + 1} end,
  68. fun(I, St) -> {ok, I * 2, St * 2} end
  69. ],
  70. ?assertEqual({ok, 4, 6}, emqx_utils:pipeline(Funs, 1, 1)),
  71. ?assertEqual(
  72. {error, undefined, 1}, emqx_utils:pipeline([fun(_I) -> {error, undefined} end], 1, 1)
  73. ),
  74. ?assertEqual(
  75. {error, undefined, 2},
  76. emqx_utils:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1)
  77. ).
  78. t_start_timer(_) ->
  79. TRef = emqx_utils:start_timer(1, tmsg),
  80. timer:sleep(2),
  81. ?assertEqual([{timeout, TRef, tmsg}], ?drainMailbox()),
  82. ok = emqx_utils:cancel_timer(TRef).
  83. t_cancel_timer(_) ->
  84. Timer = emqx_utils:start_timer(0, foo),
  85. ok = emqx_utils:cancel_timer(Timer),
  86. ?assertEqual([], ?drainMailbox()),
  87. ok = emqx_utils:cancel_timer(undefined).
  88. t_proc_name(_) ->
  89. ?assertEqual(emqx_pool_1, emqx_utils:proc_name(emqx_pool, 1)).
  90. t_proc_stats(_) ->
  91. Pid1 = spawn(fun() -> exit(normal) end),
  92. timer:sleep(10),
  93. ?assertEqual([], emqx_utils:proc_stats(Pid1)),
  94. Pid2 = spawn(fun() ->
  95. ?assertMatch([{mailbox_len, 0} | _], emqx_utils:proc_stats()),
  96. timer:sleep(200)
  97. end),
  98. timer:sleep(10),
  99. Pid2 ! msg,
  100. timer:sleep(10),
  101. ?assertMatch([{mailbox_len, 1} | _], emqx_utils:proc_stats(Pid2)).
  102. t_drain_deliver(_) ->
  103. self() ! {deliver, t1, m1},
  104. self() ! {deliver, t2, m2},
  105. ?assertEqual(
  106. [
  107. {deliver, t1, m1},
  108. {deliver, t2, m2}
  109. ],
  110. emqx_utils:drain_deliver(2)
  111. ).
  112. t_drain_down(_) ->
  113. {Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end),
  114. {Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end),
  115. timer:sleep(100),
  116. ?assertEqual([Pid1, Pid2], lists:sort(emqx_utils:drain_down(2))),
  117. ?assertEqual([], emqx_utils:drain_down(1)).
  118. t_index_of(_) ->
  119. try emqx_utils:index_of(a, []) of
  120. _ -> ct:fail(should_throw_error)
  121. catch
  122. error:Reason ->
  123. ?assertEqual(badarg, Reason)
  124. end,
  125. ?assertEqual(3, emqx_utils:index_of(a, [b, c, a, e, f])).
  126. t_check(_) ->
  127. Policy = #{
  128. max_mailbox_size => 10,
  129. max_heap_size => 1024 * 1024 * 8,
  130. enable => true
  131. },
  132. [self() ! {msg, I} || I <- lists:seq(1, 5)],
  133. ?assertEqual(ok, emqx_utils:check_oom(Policy)),
  134. [self() ! {msg, I} || I <- lists:seq(1, 6)],
  135. ?assertEqual(
  136. {shutdown, #{reason => mailbox_overflow, value => 11, max => 10}},
  137. emqx_utils:check_oom(Policy)
  138. ).
  139. t_tune_heap_size(_Config) ->
  140. Policy = #{
  141. max_mailbox_size => 10,
  142. max_heap_size => 1024 * 1024 * 8,
  143. enable => true
  144. },
  145. ?assertEqual(ignore, emqx_utils:tune_heap_size(Policy#{enable := false})),
  146. %% Setting it to 0 disables the check.
  147. ?assertEqual(ignore, emqx_utils:tune_heap_size(Policy#{max_heap_size := 0})),
  148. {max_heap_size, PreviousHeapSize} = process_info(self(), max_heap_size),
  149. try
  150. ?assertMatch(PreviousHeapSize, emqx_utils:tune_heap_size(Policy))
  151. after
  152. process_flag(max_heap_size, PreviousHeapSize)
  153. end.
  154. t_rand_seed(_) ->
  155. ?assert(is_tuple(emqx_utils:rand_seed())).
  156. t_now_to_secs(_) ->
  157. ?assert(is_integer(emqx_utils:now_to_secs(os:timestamp()))).
  158. t_now_to_ms(_) ->
  159. ?assert(is_integer(emqx_utils:now_to_ms(os:timestamp()))).
  160. t_gen_id(_) ->
  161. ?assertEqual(10, length(emqx_utils:gen_id(10))),
  162. ?assertEqual(20, length(emqx_utils:gen_id(20))).
  163. t_pmap_normal(_) ->
  164. ?assertEqual(
  165. [5, 7, 9],
  166. emqx_utils:pmap(
  167. fun({A, B}) -> A + B end,
  168. [{2, 3}, {3, 4}, {4, 5}]
  169. )
  170. ).
  171. t_pmap_timeout(_) ->
  172. ?assertExit(
  173. timeout,
  174. emqx_utils:pmap(
  175. fun
  176. (timeout) -> ct:sleep(1000);
  177. ({A, B}) -> A + B
  178. end,
  179. [{2, 3}, {3, 4}, timeout],
  180. 100
  181. )
  182. ).
  183. t_pmap_exception(_) ->
  184. ?assertError(
  185. foobar,
  186. emqx_utils:pmap(
  187. fun
  188. (error) -> error(foobar);
  189. ({A, B}) -> A + B
  190. end,
  191. [{2, 3}, {3, 4}, error]
  192. )
  193. ).
  194. t_pmap_late_reply(_) ->
  195. ?check_trace(
  196. begin
  197. ?force_ordering(
  198. #{?snk_kind := pmap_middleman_sent_response},
  199. #{?snk_kind := pmap_timeout}
  200. ),
  201. Timeout = 100,
  202. Res =
  203. catch emqx_utils:pmap(
  204. fun(_) ->
  205. process_flag(trap_exit, true),
  206. timer:sleep(3 * Timeout),
  207. done
  208. end,
  209. [1, 2, 3],
  210. Timeout
  211. ),
  212. receive
  213. {Ref, LateReply} when is_reference(Ref) ->
  214. ct:fail("should not receive late reply: ~p", [LateReply])
  215. after (5 * Timeout) ->
  216. ok
  217. end,
  218. ?assertMatch([done, done, done], Res),
  219. ok
  220. end,
  221. []
  222. ),
  223. ok.
  224. t_flattermap(_) ->
  225. ?assertEqual(
  226. [42],
  227. emqx_utils:flattermap(fun identity/1, [42])
  228. ),
  229. ?assertEqual(
  230. [42, 42],
  231. emqx_utils:flattermap(fun duplicate/1, [42])
  232. ),
  233. ?assertEqual(
  234. [],
  235. emqx_utils:flattermap(fun nil/1, [42])
  236. ),
  237. ?assertEqual(
  238. [1, 1, 2, 2, 3, 3],
  239. emqx_utils:flattermap(fun duplicate/1, [1, 2, 3])
  240. ),
  241. ?assertEqual(
  242. [],
  243. emqx_utils:flattermap(fun nil/1, [1, 2, 3])
  244. ),
  245. ?assertEqual(
  246. [1, 2, 2, 4, 5, 5],
  247. emqx_utils:flattermap(
  248. fun(X) ->
  249. case X rem 3 of
  250. 0 -> [];
  251. 1 -> X;
  252. 2 -> [X, X]
  253. end
  254. end,
  255. [1, 2, 3, 4, 5]
  256. )
  257. ).
  258. duplicate(X) ->
  259. [X, X].
  260. nil(_) ->
  261. [].
  262. identity(X) ->
  263. X.