emqx_utils_stream_tests.erl 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils_stream_tests).
  17. -include_lib("eunit/include/eunit.hrl").
  18. empty_test() ->
  19. S = emqx_utils_stream:empty(),
  20. ?assertEqual([], emqx_utils_stream:next(S)).
  21. empty_consume_test() ->
  22. S = emqx_utils_stream:empty(),
  23. ?assertEqual([], emqx_utils_stream:consume(S)).
  24. chain_empties_test() ->
  25. S = emqx_utils_stream:chain(
  26. emqx_utils_stream:empty(),
  27. emqx_utils_stream:empty()
  28. ),
  29. ?assertEqual([], emqx_utils_stream:next(S)).
  30. chain_list_test() ->
  31. S = emqx_utils_stream:chain(
  32. emqx_utils_stream:list([1, 2, 3]),
  33. emqx_utils_stream:list([4, 5, 6])
  34. ),
  35. ?assertEqual(
  36. [1, 2, 3, 4, 5, 6],
  37. emqx_utils_stream:consume(S)
  38. ).
  39. chain_take_test() ->
  40. S = emqx_utils_stream:chain(
  41. emqx_utils_stream:list([1, 2, 3]),
  42. emqx_utils_stream:list([4, 5, 6, 7, 8])
  43. ),
  44. ?assertMatch(
  45. {[1, 2, 3, 4, 5], _SRest},
  46. emqx_utils_stream:consume(5, S)
  47. ),
  48. {_, SRest} = emqx_utils_stream:consume(5, S),
  49. ?assertEqual(
  50. [6, 7, 8],
  51. emqx_utils_stream:consume(5, SRest)
  52. ).
  53. chain_list_map_test() ->
  54. S = emqx_utils_stream:map(
  55. fun integer_to_list/1,
  56. emqx_utils_stream:chain(
  57. emqx_utils_stream:list([1, 2, 3]),
  58. emqx_utils_stream:chain(
  59. emqx_utils_stream:empty(),
  60. emqx_utils_stream:list([4, 5, 6])
  61. )
  62. )
  63. ),
  64. ?assertEqual(
  65. ["1", "2", "3", "4", "5", "6"],
  66. emqx_utils_stream:consume(S)
  67. ).
  68. filter_test() ->
  69. S = emqx_utils_stream:filter(
  70. fun(N) -> N rem 2 =:= 0 end,
  71. emqx_utils_stream:chain(
  72. emqx_utils_stream:list([1, 2, 3]),
  73. emqx_utils_stream:chain(
  74. emqx_utils_stream:empty(),
  75. emqx_utils_stream:list([4, 5, 6])
  76. )
  77. )
  78. ),
  79. ?assertEqual(
  80. [2, 4, 6],
  81. emqx_utils_stream:consume(S)
  82. ).
  83. drop_test() ->
  84. S = emqx_utils_stream:drop(2, emqx_utils_stream:list([1, 2, 3, 4, 5])),
  85. ?assertEqual(
  86. [3, 4, 5],
  87. emqx_utils_stream:consume(S)
  88. ).
  89. foreach_test() ->
  90. Self = self(),
  91. ok = emqx_utils_stream:foreach(
  92. fun(N) -> erlang:send(Self, N) end,
  93. emqx_utils_stream:chain(
  94. emqx_utils_stream:list([1, 2, 3]),
  95. emqx_utils_stream:chain(
  96. emqx_utils_stream:empty(),
  97. emqx_utils_stream:list([4, 5, 6])
  98. )
  99. )
  100. ),
  101. ?assertEqual(
  102. [1, 2, 3, 4, 5, 6],
  103. emqx_utils_stream:consume(emqx_utils_stream:mqueue(100))
  104. ).
  105. fold_test() ->
  106. S = emqx_utils_stream:drop(2, emqx_utils_stream:list([1, 2, 3, 4, 5])),
  107. ?assertEqual(
  108. 3 * 4 * 5,
  109. emqx_utils_stream:fold(fun(X, P) -> P * X end, 1, S)
  110. ).
  111. fold_n_test() ->
  112. S = emqx_utils_stream:repeat(
  113. emqx_utils_stream:map(
  114. fun(X) -> X * 2 end,
  115. emqx_utils_stream:list([1, 2, 3])
  116. )
  117. ),
  118. ?assertMatch(
  119. {2 + 4 + 6 + 2 + 4 + 6 + 2, _SRest},
  120. emqx_utils_stream:fold(fun(X, Sum) -> Sum + X end, 0, _N = 7, S)
  121. ).
  122. chainmap_test() ->
  123. S = emqx_utils_stream:chainmap(
  124. fun(N) ->
  125. case N rem 2 of
  126. 1 ->
  127. emqx_utils_stream:chain(
  128. emqx_utils_stream:chain(emqx_utils_stream:list([N]), []),
  129. emqx_utils_stream:list([N + 1])
  130. );
  131. 0 ->
  132. emqx_utils_stream:empty()
  133. end
  134. end,
  135. emqx_utils_stream:chain(
  136. emqx_utils_stream:list([1, 2, 3]),
  137. emqx_utils_stream:chain(
  138. emqx_utils_stream:empty(),
  139. emqx_utils_stream:list([4, 5, 6])
  140. )
  141. )
  142. ),
  143. ?assertEqual(
  144. [1, 2, 3, 4, 5, 6],
  145. emqx_utils_stream:consume(S)
  146. ).
  147. transpose_test() ->
  148. S = emqx_utils_stream:transpose([
  149. emqx_utils_stream:list([1, 2, 3]),
  150. emqx_utils_stream:list([4, 5, 6, 7])
  151. ]),
  152. ?assertEqual(
  153. [[1, 4], [2, 5], [3, 6]],
  154. emqx_utils_stream:consume(S)
  155. ).
  156. transpose_none_test() ->
  157. ?assertEqual(
  158. [],
  159. emqx_utils_stream:consume(emqx_utils_stream:transpose([]))
  160. ).
  161. transpose_one_test() ->
  162. S = emqx_utils_stream:transpose([emqx_utils_stream:list([1, 2, 3])]),
  163. ?assertEqual(
  164. [[1], [2], [3]],
  165. emqx_utils_stream:consume(S)
  166. ).
  167. transpose_many_test() ->
  168. S = emqx_utils_stream:transpose([
  169. emqx_utils_stream:list([1, 2, 3]),
  170. emqx_utils_stream:list([4, 5, 6, 7]),
  171. emqx_utils_stream:list([8, 9])
  172. ]),
  173. ?assertEqual(
  174. [[1, 4, 8], [2, 5, 9]],
  175. emqx_utils_stream:consume(S)
  176. ).
  177. transpose_many_empty_test() ->
  178. S = emqx_utils_stream:transpose([
  179. emqx_utils_stream:list([1, 2, 3]),
  180. emqx_utils_stream:list([4, 5, 6, 7]),
  181. emqx_utils_stream:empty()
  182. ]),
  183. ?assertEqual(
  184. [],
  185. emqx_utils_stream:consume(S)
  186. ).
  187. repeat_test() ->
  188. S = emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2, 3])),
  189. ?assertMatch(
  190. {[1, 2, 3, 1, 2, 3, 1, 2], _},
  191. emqx_utils_stream:consume(8, S)
  192. ),
  193. {_, SRest} = emqx_utils_stream:consume(8, S),
  194. ?assertMatch(
  195. {[3, 1, 2, 3, 1, 2, 3, 1], _},
  196. emqx_utils_stream:consume(8, SRest)
  197. ).
  198. repeat_empty_test() ->
  199. S = emqx_utils_stream:repeat(emqx_utils_stream:list([])),
  200. ?assertEqual(
  201. [],
  202. emqx_utils_stream:consume(8, S)
  203. ).
  204. transpose_repeat_test() ->
  205. S = emqx_utils_stream:transpose([
  206. emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2])),
  207. emqx_utils_stream:list([4, 5, 6, 7, 8])
  208. ]),
  209. ?assertEqual(
  210. [[1, 4], [2, 5], [1, 6], [2, 7], [1, 8]],
  211. emqx_utils_stream:consume(S)
  212. ).
  213. mqueue_test() ->
  214. _ = erlang:send_after(1, self(), 1),
  215. _ = erlang:send_after(100, self(), 2),
  216. _ = erlang:send_after(20, self(), 42),
  217. ?assertEqual(
  218. [1, 42, 2],
  219. emqx_utils_stream:consume(emqx_utils_stream:mqueue(400))
  220. ).
  221. interleave_test() ->
  222. S1 = emqx_utils_stream:list([1, 2, 3]),
  223. S2 = emqx_utils_stream:list([a, b, c, d]),
  224. ?assertEqual(
  225. [1, 2, a, b, 3, c, d],
  226. emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], true))
  227. ).
  228. interleave_stop_test() ->
  229. S1 = emqx_utils_stream:const(1),
  230. S2 = emqx_utils_stream:list([a, b, c, d]),
  231. ?assertEqual(
  232. [1, 1, a, b, 1, 1, c, d, 1, 1],
  233. emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false))
  234. ).
  235. ets_test() ->
  236. T = ets:new(tab, [ordered_set]),
  237. Objects = [{N, N} || N <- lists:seq(1, 10)],
  238. ets:insert(T, Objects),
  239. S = emqx_utils_stream:ets(
  240. fun
  241. (undefined) -> ets:match_object(T, '_', 4);
  242. (Cont) -> ets:match_object(Cont)
  243. end
  244. ),
  245. ?assertEqual(
  246. Objects,
  247. emqx_utils_stream:consume(S)
  248. ).
  249. csv_test() ->
  250. Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>,
  251. ?assertEqual(
  252. [
  253. #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>},
  254. #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>}
  255. ],
  256. emqx_utils_stream:consume(emqx_utils_stream:csv(Data1))
  257. ),
  258. Data2 = <<"h1, h2, h3\nvv1, vv2, vv3\nvv4,vv5,vv6\n">>,
  259. ?assertEqual(
  260. [
  261. #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>},
  262. #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>}
  263. ],
  264. emqx_utils_stream:consume(emqx_utils_stream:csv(Data2))
  265. ),
  266. ?assertEqual(
  267. [],
  268. emqx_utils_stream:consume(emqx_utils_stream:csv(<<"">>))
  269. ),
  270. BadData = <<"h1,h2,h3\r\nv1,v2,v3\r\nv4,v5">>,
  271. ?assertException(
  272. error,
  273. bad_format,
  274. emqx_utils_stream:consume(emqx_utils_stream:csv(BadData))
  275. ).