emqx_utils_SUITE.erl 7.4 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("emqx/include/asserts.hrl").
  21. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  22. -define(SOCKOPTS, [
  23. binary,
  24. {packet, raw},
  25. {reuseaddr, true},
  26. {backlog, 512},
  27. {nodelay, true}
  28. ]).
  29. all() -> emqx_common_test_helpers:all(?MODULE).
  30. t_merge_opts(_) ->
  31. Opts = emqx_utils:merge_opts(?SOCKOPTS, [
  32. raw,
  33. binary,
  34. {backlog, 1024},
  35. {nodelay, false},
  36. {max_clients, 1024},
  37. {acceptors, 16}
  38. ]),
  39. ?assertEqual(1024, proplists:get_value(backlog, Opts)),
  40. ?assertEqual(1024, proplists:get_value(max_clients, Opts)),
  41. ?assertEqual(
  42. [
  43. binary,
  44. raw,
  45. {acceptors, 16},
  46. {backlog, 1024},
  47. {max_clients, 1024},
  48. {nodelay, false},
  49. {packet, raw},
  50. {reuseaddr, true}
  51. ],
  52. lists:sort(Opts)
  53. ).
  54. t_maybe_apply(_) ->
  55. ?assertEqual(undefined, emqx_utils:maybe_apply(fun(A) -> A end, undefined)),
  56. ?assertEqual(a, emqx_utils:maybe_apply(fun(A) -> A end, a)).
  57. t_run_fold(_) ->
  58. ?assertEqual(1, emqx_utils:run_fold([], 1, state)),
  59. Add = fun(I, St) -> I + St end,
  60. Mul = fun(I, St) -> I * St end,
  61. ?assertEqual(6, emqx_utils:run_fold([Add, Mul], 1, 2)).
  62. t_pipeline(_) ->
  63. ?assertEqual({ok, input, state}, emqx_utils:pipeline([], input, state)),
  64. Funs = [
  65. fun(_I, _St) -> ok end,
  66. fun(_I, St) -> {ok, St + 1} end,
  67. fun(I, St) -> {ok, I + 1, St + 1} end,
  68. fun(I, St) -> {ok, I * 2, St * 2} end
  69. ],
  70. ?assertEqual({ok, 4, 6}, emqx_utils:pipeline(Funs, 1, 1)),
  71. ?assertEqual(
  72. {error, undefined, 1}, emqx_utils:pipeline([fun(_I) -> {error, undefined} end], 1, 1)
  73. ),
  74. ?assertEqual(
  75. {error, undefined, 2},
  76. emqx_utils:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1)
  77. ).
  78. t_start_timer(_) ->
  79. TRef = emqx_utils:start_timer(1, tmsg),
  80. timer:sleep(2),
  81. ?assertEqual([{timeout, TRef, tmsg}], ?drainMailbox()),
  82. ok = emqx_utils:cancel_timer(TRef).
  83. t_cancel_timer(_) ->
  84. Timer = emqx_utils:start_timer(0, foo),
  85. ok = emqx_utils:cancel_timer(Timer),
  86. ?assertEqual([], ?drainMailbox()),
  87. ok = emqx_utils:cancel_timer(undefined).
  88. t_proc_name(_) ->
  89. ?assertEqual(emqx_pool_1, emqx_utils:proc_name(emqx_pool, 1)).
  90. t_proc_stats(_) ->
  91. Pid1 = spawn(fun() -> exit(normal) end),
  92. timer:sleep(10),
  93. ?assertEqual([], emqx_utils:proc_stats(Pid1)),
  94. Pid2 = spawn(fun() ->
  95. ?assertMatch([{mailbox_len, 0} | _], emqx_utils:proc_stats()),
  96. timer:sleep(200)
  97. end),
  98. timer:sleep(10),
  99. Pid2 ! msg,
  100. timer:sleep(10),
  101. ?assertMatch([{mailbox_len, 1} | _], emqx_utils:proc_stats(Pid2)).
  102. t_drain_deliver(_) ->
  103. self() ! {deliver, t1, m1},
  104. self() ! {deliver, t2, m2},
  105. ?assertEqual(
  106. [
  107. {deliver, t1, m1},
  108. {deliver, t2, m2}
  109. ],
  110. emqx_utils:drain_deliver(2)
  111. ).
  112. t_drain_down(_) ->
  113. {Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end),
  114. {Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end),
  115. timer:sleep(100),
  116. ?assertEqual([Pid1, Pid2], lists:sort(emqx_utils:drain_down(2))),
  117. ?assertEqual([], emqx_utils:drain_down(1)).
  118. t_index_of(_) ->
  119. try emqx_utils:index_of(a, []) of
  120. _ -> ct:fail(should_throw_error)
  121. catch
  122. error:Reason ->
  123. ?assertEqual(badarg, Reason)
  124. end,
  125. ?assertEqual(3, emqx_utils:index_of(a, [b, c, a, e, f])).
  126. t_check(_) ->
  127. Policy = #{
  128. max_mailbox_size => 10,
  129. max_heap_size => 1024 * 1024 * 8,
  130. enable => true
  131. },
  132. [self() ! {msg, I} || I <- lists:seq(1, 5)],
  133. ?assertEqual(ok, emqx_utils:check_oom(Policy)),
  134. [self() ! {msg, I} || I <- lists:seq(1, 6)],
  135. ?assertEqual(
  136. {shutdown, #{reason => mailbox_overflow, value => 11, max => 10}},
  137. emqx_utils:check_oom(Policy)
  138. ).
  139. t_rand_seed(_) ->
  140. ?assert(is_tuple(emqx_utils:rand_seed())).
  141. t_now_to_secs(_) ->
  142. ?assert(is_integer(emqx_utils:now_to_secs(os:timestamp()))).
  143. t_now_to_ms(_) ->
  144. ?assert(is_integer(emqx_utils:now_to_ms(os:timestamp()))).
  145. t_gen_id(_) ->
  146. ?assertEqual(10, length(emqx_utils:gen_id(10))),
  147. ?assertEqual(20, length(emqx_utils:gen_id(20))).
  148. t_pmap_normal(_) ->
  149. ?assertEqual(
  150. [5, 7, 9],
  151. emqx_utils:pmap(
  152. fun({A, B}) -> A + B end,
  153. [{2, 3}, {3, 4}, {4, 5}]
  154. )
  155. ).
  156. t_pmap_timeout(_) ->
  157. ?assertExit(
  158. timeout,
  159. emqx_utils:pmap(
  160. fun
  161. (timeout) -> ct:sleep(1000);
  162. ({A, B}) -> A + B
  163. end,
  164. [{2, 3}, {3, 4}, timeout],
  165. 100
  166. )
  167. ).
  168. t_pmap_exception(_) ->
  169. ?assertError(
  170. foobar,
  171. emqx_utils:pmap(
  172. fun
  173. (error) -> error(foobar);
  174. ({A, B}) -> A + B
  175. end,
  176. [{2, 3}, {3, 4}, error]
  177. )
  178. ).
  179. t_pmap_late_reply(_) ->
  180. ?check_trace(
  181. begin
  182. ?force_ordering(
  183. #{?snk_kind := pmap_middleman_sent_response},
  184. #{?snk_kind := pmap_timeout}
  185. ),
  186. Timeout = 100,
  187. Res =
  188. catch emqx_utils:pmap(
  189. fun(_) ->
  190. process_flag(trap_exit, true),
  191. timer:sleep(3 * Timeout),
  192. done
  193. end,
  194. [1, 2, 3],
  195. Timeout
  196. ),
  197. receive
  198. {Ref, LateReply} when is_reference(Ref) ->
  199. ct:fail("should not receive late reply: ~p", [LateReply])
  200. after (5 * Timeout) ->
  201. ok
  202. end,
  203. ?assertMatch([done, done, done], Res),
  204. ok
  205. end,
  206. []
  207. ),
  208. ok.
  209. t_flattermap(_) ->
  210. ?assertEqual(
  211. [42],
  212. emqx_utils:flattermap(fun identity/1, [42])
  213. ),
  214. ?assertEqual(
  215. [42, 42],
  216. emqx_utils:flattermap(fun duplicate/1, [42])
  217. ),
  218. ?assertEqual(
  219. [],
  220. emqx_utils:flattermap(fun nil/1, [42])
  221. ),
  222. ?assertEqual(
  223. [1, 1, 2, 2, 3, 3],
  224. emqx_utils:flattermap(fun duplicate/1, [1, 2, 3])
  225. ),
  226. ?assertEqual(
  227. [],
  228. emqx_utils:flattermap(fun nil/1, [1, 2, 3])
  229. ),
  230. ?assertEqual(
  231. [1, 2, 2, 4, 5, 5],
  232. emqx_utils:flattermap(
  233. fun(X) ->
  234. case X rem 3 of
  235. 0 -> [];
  236. 1 -> X;
  237. 2 -> [X, X]
  238. end
  239. end,
  240. [1, 2, 3, 4, 5]
  241. )
  242. ).
  243. duplicate(X) ->
  244. [X, X].
  245. nil(_) ->
  246. [].
  247. identity(X) ->
  248. X.