emqx_utils_SUITE.erl 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  21. -define(SOCKOPTS, [
  22. binary,
  23. {packet, raw},
  24. {reuseaddr, true},
  25. {backlog, 512},
  26. {nodelay, true}
  27. ]).
  28. all() -> emqx_common_test_helpers:all(?MODULE).
  29. t_merge_opts(_) ->
  30. Opts = emqx_utils:merge_opts(?SOCKOPTS, [
  31. raw,
  32. binary,
  33. {backlog, 1024},
  34. {nodelay, false},
  35. {max_clients, 1024},
  36. {acceptors, 16}
  37. ]),
  38. ?assertEqual(1024, proplists:get_value(backlog, Opts)),
  39. ?assertEqual(1024, proplists:get_value(max_clients, Opts)),
  40. ?assertEqual(
  41. [
  42. binary,
  43. raw,
  44. {acceptors, 16},
  45. {backlog, 1024},
  46. {max_clients, 1024},
  47. {nodelay, false},
  48. {packet, raw},
  49. {reuseaddr, true}
  50. ],
  51. lists:sort(Opts)
  52. ).
  53. t_maybe_apply(_) ->
  54. ?assertEqual(undefined, emqx_utils:maybe_apply(fun(A) -> A end, undefined)),
  55. ?assertEqual(a, emqx_utils:maybe_apply(fun(A) -> A end, a)).
  56. t_run_fold(_) ->
  57. ?assertEqual(1, emqx_utils:run_fold([], 1, state)),
  58. Add = fun(I, St) -> I + St end,
  59. Mul = fun(I, St) -> I * St end,
  60. ?assertEqual(6, emqx_utils:run_fold([Add, Mul], 1, 2)).
  61. t_pipeline(_) ->
  62. ?assertEqual({ok, input, state}, emqx_utils:pipeline([], input, state)),
  63. Funs = [
  64. fun(_I, _St) -> ok end,
  65. fun(_I, St) -> {ok, St + 1} end,
  66. fun(I, St) -> {ok, I + 1, St + 1} end,
  67. fun(I, St) -> {ok, I * 2, St * 2} end
  68. ],
  69. ?assertEqual({ok, 4, 6}, emqx_utils:pipeline(Funs, 1, 1)),
  70. ?assertEqual(
  71. {error, undefined, 1}, emqx_utils:pipeline([fun(_I) -> {error, undefined} end], 1, 1)
  72. ),
  73. ?assertEqual(
  74. {error, undefined, 2},
  75. emqx_utils:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1)
  76. ).
  77. t_start_timer(_) ->
  78. TRef = emqx_utils:start_timer(1, tmsg),
  79. timer:sleep(2),
  80. ?assertEqual([{timeout, TRef, tmsg}], drain()),
  81. ok = emqx_utils:cancel_timer(TRef).
  82. t_cancel_timer(_) ->
  83. Timer = emqx_utils:start_timer(0, foo),
  84. ok = emqx_utils:cancel_timer(Timer),
  85. ?assertEqual([], drain()),
  86. ok = emqx_utils:cancel_timer(undefined).
  87. t_proc_name(_) ->
  88. ?assertEqual(emqx_pool_1, emqx_utils:proc_name(emqx_pool, 1)).
  89. t_proc_stats(_) ->
  90. Pid1 = spawn(fun() -> exit(normal) end),
  91. timer:sleep(10),
  92. ?assertEqual([], emqx_utils:proc_stats(Pid1)),
  93. Pid2 = spawn(fun() ->
  94. ?assertMatch([{mailbox_len, 0} | _], emqx_utils:proc_stats()),
  95. timer:sleep(200)
  96. end),
  97. timer:sleep(10),
  98. Pid2 ! msg,
  99. timer:sleep(10),
  100. ?assertMatch([{mailbox_len, 1} | _], emqx_utils:proc_stats(Pid2)).
  101. t_drain_deliver(_) ->
  102. self() ! {deliver, t1, m1},
  103. self() ! {deliver, t2, m2},
  104. ?assertEqual(
  105. [
  106. {deliver, t1, m1},
  107. {deliver, t2, m2}
  108. ],
  109. emqx_utils:drain_deliver(2)
  110. ).
  111. t_drain_down(_) ->
  112. {Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end),
  113. {Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end),
  114. timer:sleep(100),
  115. ?assertEqual([Pid1, Pid2], lists:sort(emqx_utils:drain_down(2))),
  116. ?assertEqual([], emqx_utils:drain_down(1)).
  117. t_index_of(_) ->
  118. try emqx_utils:index_of(a, []) of
  119. _ -> ct:fail(should_throw_error)
  120. catch
  121. error:Reason ->
  122. ?assertEqual(badarg, Reason)
  123. end,
  124. ?assertEqual(3, emqx_utils:index_of(a, [b, c, a, e, f])).
  125. t_check(_) ->
  126. Policy = #{
  127. max_mailbox_size => 10,
  128. max_heap_size => 1024 * 1024 * 8,
  129. enable => true
  130. },
  131. [self() ! {msg, I} || I <- lists:seq(1, 5)],
  132. ?assertEqual(ok, emqx_utils:check_oom(Policy)),
  133. [self() ! {msg, I} || I <- lists:seq(1, 6)],
  134. ?assertEqual(
  135. {shutdown, #{reason => message_queue_too_long, value => 11, max => 10}},
  136. emqx_utils:check_oom(Policy)
  137. ).
  138. drain() ->
  139. drain([]).
  140. drain(Acc) ->
  141. receive
  142. Msg -> drain([Msg | Acc])
  143. after 0 ->
  144. lists:reverse(Acc)
  145. end.
  146. t_rand_seed(_) ->
  147. ?assert(is_tuple(emqx_utils:rand_seed())).
  148. t_now_to_secs(_) ->
  149. ?assert(is_integer(emqx_utils:now_to_secs(os:timestamp()))).
  150. t_now_to_ms(_) ->
  151. ?assert(is_integer(emqx_utils:now_to_ms(os:timestamp()))).
  152. t_gen_id(_) ->
  153. ?assertEqual(10, length(emqx_utils:gen_id(10))),
  154. ?assertEqual(20, length(emqx_utils:gen_id(20))).
  155. t_pmap_normal(_) ->
  156. ?assertEqual(
  157. [5, 7, 9],
  158. emqx_utils:pmap(
  159. fun({A, B}) -> A + B end,
  160. [{2, 3}, {3, 4}, {4, 5}]
  161. )
  162. ).
  163. t_pmap_timeout(_) ->
  164. ?assertExit(
  165. timeout,
  166. emqx_utils:pmap(
  167. fun
  168. (timeout) -> ct:sleep(1000);
  169. ({A, B}) -> A + B
  170. end,
  171. [{2, 3}, {3, 4}, timeout],
  172. 100
  173. )
  174. ).
  175. t_pmap_exception(_) ->
  176. ?assertError(
  177. foobar,
  178. emqx_utils:pmap(
  179. fun
  180. (error) -> error(foobar);
  181. ({A, B}) -> A + B
  182. end,
  183. [{2, 3}, {3, 4}, error]
  184. )
  185. ).
  186. t_pmap_late_reply(_) ->
  187. ?check_trace(
  188. begin
  189. ?force_ordering(
  190. #{?snk_kind := pmap_middleman_sent_response},
  191. #{?snk_kind := pmap_timeout}
  192. ),
  193. Timeout = 100,
  194. Res =
  195. catch emqx_utils:pmap(
  196. fun(_) ->
  197. process_flag(trap_exit, true),
  198. timer:sleep(3 * Timeout),
  199. done
  200. end,
  201. [1, 2, 3],
  202. Timeout
  203. ),
  204. receive
  205. {Ref, LateReply} when is_reference(Ref) ->
  206. ct:fail("should not receive late reply: ~p", [LateReply])
  207. after (5 * Timeout) ->
  208. ok
  209. end,
  210. ?assertMatch([done, done, done], Res),
  211. ok
  212. end,
  213. []
  214. ),
  215. ok.