emqx_utils_stream_tests.erl 8.0 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils_stream_tests).
  17. -include_lib("eunit/include/eunit.hrl").
  18. empty_test() ->
  19. S = emqx_utils_stream:empty(),
  20. ?assertEqual([], emqx_utils_stream:next(S)).
  21. empty_consume_test() ->
  22. S = emqx_utils_stream:empty(),
  23. ?assertEqual([], emqx_utils_stream:consume(S)).
  24. chain_empties_test() ->
  25. S = emqx_utils_stream:chain(
  26. emqx_utils_stream:empty(),
  27. emqx_utils_stream:empty()
  28. ),
  29. ?assertEqual([], emqx_utils_stream:next(S)).
  30. chain_list_test() ->
  31. S = emqx_utils_stream:chain(
  32. emqx_utils_stream:list([1, 2, 3]),
  33. emqx_utils_stream:list([4, 5, 6])
  34. ),
  35. ?assertEqual(
  36. [1, 2, 3, 4, 5, 6],
  37. emqx_utils_stream:consume(S)
  38. ).
  39. chain_take_test() ->
  40. S = emqx_utils_stream:chain(
  41. emqx_utils_stream:list([1, 2, 3]),
  42. emqx_utils_stream:list([4, 5, 6, 7, 8])
  43. ),
  44. ?assertMatch(
  45. {[1, 2, 3, 4, 5], _SRest},
  46. emqx_utils_stream:consume(5, S)
  47. ),
  48. {_, SRest} = emqx_utils_stream:consume(5, S),
  49. ?assertEqual(
  50. [6, 7, 8],
  51. emqx_utils_stream:consume(5, SRest)
  52. ).
  53. chain_list_map_test() ->
  54. S = emqx_utils_stream:map(
  55. fun integer_to_list/1,
  56. emqx_utils_stream:chain(
  57. emqx_utils_stream:list([1, 2, 3]),
  58. emqx_utils_stream:chain(
  59. emqx_utils_stream:empty(),
  60. emqx_utils_stream:list([4, 5, 6])
  61. )
  62. )
  63. ),
  64. ?assertEqual(
  65. ["1", "2", "3", "4", "5", "6"],
  66. emqx_utils_stream:consume(S)
  67. ).
  68. filter_test() ->
  69. S = emqx_utils_stream:filter(
  70. fun(N) -> N rem 2 =:= 0 end,
  71. emqx_utils_stream:chain(
  72. emqx_utils_stream:list([1, 2, 3]),
  73. emqx_utils_stream:chain(
  74. emqx_utils_stream:empty(),
  75. emqx_utils_stream:list([4, 5, 6])
  76. )
  77. )
  78. ),
  79. ?assertEqual(
  80. [2, 4, 6],
  81. emqx_utils_stream:consume(S)
  82. ).
  83. drop_test() ->
  84. S = emqx_utils_stream:drop(2, emqx_utils_stream:list([1, 2, 3, 4, 5])),
  85. ?assertEqual(
  86. [3, 4, 5],
  87. emqx_utils_stream:consume(S)
  88. ).
  89. foreach_test() ->
  90. Self = self(),
  91. ok = emqx_utils_stream:foreach(
  92. fun(N) -> erlang:send(Self, N) end,
  93. emqx_utils_stream:chain(
  94. emqx_utils_stream:list([1, 2, 3]),
  95. emqx_utils_stream:chain(
  96. emqx_utils_stream:empty(),
  97. emqx_utils_stream:list([4, 5, 6])
  98. )
  99. )
  100. ),
  101. ?assertEqual(
  102. [1, 2, 3, 4, 5, 6],
  103. emqx_utils_stream:consume(emqx_utils_stream:mqueue(100))
  104. ).
  105. chainmap_test() ->
  106. S = emqx_utils_stream:chainmap(
  107. fun(N) ->
  108. case N rem 2 of
  109. 1 ->
  110. emqx_utils_stream:chain(
  111. emqx_utils_stream:chain(emqx_utils_stream:list([N]), []),
  112. emqx_utils_stream:list([N + 1])
  113. );
  114. 0 ->
  115. emqx_utils_stream:empty()
  116. end
  117. end,
  118. emqx_utils_stream:chain(
  119. emqx_utils_stream:list([1, 2, 3]),
  120. emqx_utils_stream:chain(
  121. emqx_utils_stream:empty(),
  122. emqx_utils_stream:list([4, 5, 6])
  123. )
  124. )
  125. ),
  126. ?assertEqual(
  127. [1, 2, 3, 4, 5, 6],
  128. emqx_utils_stream:consume(S)
  129. ).
  130. transpose_test() ->
  131. S = emqx_utils_stream:transpose([
  132. emqx_utils_stream:list([1, 2, 3]),
  133. emqx_utils_stream:list([4, 5, 6, 7])
  134. ]),
  135. ?assertEqual(
  136. [[1, 4], [2, 5], [3, 6]],
  137. emqx_utils_stream:consume(S)
  138. ).
  139. transpose_none_test() ->
  140. ?assertEqual(
  141. [],
  142. emqx_utils_stream:consume(emqx_utils_stream:transpose([]))
  143. ).
  144. transpose_one_test() ->
  145. S = emqx_utils_stream:transpose([emqx_utils_stream:list([1, 2, 3])]),
  146. ?assertEqual(
  147. [[1], [2], [3]],
  148. emqx_utils_stream:consume(S)
  149. ).
  150. transpose_many_test() ->
  151. S = emqx_utils_stream:transpose([
  152. emqx_utils_stream:list([1, 2, 3]),
  153. emqx_utils_stream:list([4, 5, 6, 7]),
  154. emqx_utils_stream:list([8, 9])
  155. ]),
  156. ?assertEqual(
  157. [[1, 4, 8], [2, 5, 9]],
  158. emqx_utils_stream:consume(S)
  159. ).
  160. transpose_many_empty_test() ->
  161. S = emqx_utils_stream:transpose([
  162. emqx_utils_stream:list([1, 2, 3]),
  163. emqx_utils_stream:list([4, 5, 6, 7]),
  164. emqx_utils_stream:empty()
  165. ]),
  166. ?assertEqual(
  167. [],
  168. emqx_utils_stream:consume(S)
  169. ).
  170. repeat_test() ->
  171. S = emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2, 3])),
  172. ?assertMatch(
  173. {[1, 2, 3, 1, 2, 3, 1, 2], _},
  174. emqx_utils_stream:consume(8, S)
  175. ),
  176. {_, SRest} = emqx_utils_stream:consume(8, S),
  177. ?assertMatch(
  178. {[3, 1, 2, 3, 1, 2, 3, 1], _},
  179. emqx_utils_stream:consume(8, SRest)
  180. ).
  181. repeat_empty_test() ->
  182. S = emqx_utils_stream:repeat(emqx_utils_stream:list([])),
  183. ?assertEqual(
  184. [],
  185. emqx_utils_stream:consume(8, S)
  186. ).
  187. transpose_repeat_test() ->
  188. S = emqx_utils_stream:transpose([
  189. emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2])),
  190. emqx_utils_stream:list([4, 5, 6, 7, 8])
  191. ]),
  192. ?assertEqual(
  193. [[1, 4], [2, 5], [1, 6], [2, 7], [1, 8]],
  194. emqx_utils_stream:consume(S)
  195. ).
  196. mqueue_test() ->
  197. _ = erlang:send_after(1, self(), 1),
  198. _ = erlang:send_after(100, self(), 2),
  199. _ = erlang:send_after(20, self(), 42),
  200. ?assertEqual(
  201. [1, 42, 2],
  202. emqx_utils_stream:consume(emqx_utils_stream:mqueue(400))
  203. ).
  204. interleave_test() ->
  205. S1 = emqx_utils_stream:list([1, 2, 3]),
  206. S2 = emqx_utils_stream:list([a, b, c, d]),
  207. ?assertEqual(
  208. [1, 2, a, b, 3, c, d],
  209. emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], true))
  210. ).
  211. interleave_stop_test() ->
  212. S1 = emqx_utils_stream:const(1),
  213. S2 = emqx_utils_stream:list([a, b, c, d]),
  214. ?assertEqual(
  215. [1, 1, a, b, 1, 1, c, d, 1, 1],
  216. emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false))
  217. ).
  218. ets_test() ->
  219. T = ets:new(tab, [ordered_set]),
  220. Objects = [{N, N} || N <- lists:seq(1, 10)],
  221. ets:insert(T, Objects),
  222. S = emqx_utils_stream:ets(
  223. fun
  224. (undefined) -> ets:match_object(T, '_', 4);
  225. (Cont) -> ets:match_object(Cont)
  226. end
  227. ),
  228. ?assertEqual(
  229. Objects,
  230. emqx_utils_stream:consume(S)
  231. ).
  232. csv_test() ->
  233. Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>,
  234. ?assertEqual(
  235. [
  236. #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>},
  237. #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>}
  238. ],
  239. emqx_utils_stream:consume(emqx_utils_stream:csv(Data1))
  240. ),
  241. Data2 = <<"h1, h2, h3\nvv1, vv2, vv3\nvv4,vv5,vv6\n">>,
  242. ?assertEqual(
  243. [
  244. #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>},
  245. #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>}
  246. ],
  247. emqx_utils_stream:consume(emqx_utils_stream:csv(Data2))
  248. ),
  249. ?assertEqual(
  250. [],
  251. emqx_utils_stream:consume(emqx_utils_stream:csv(<<"">>))
  252. ),
  253. BadData = <<"h1,h2,h3\r\nv1,v2,v3\r\nv4,v5">>,
  254. ?assertException(
  255. error,
  256. bad_format,
  257. emqx_utils_stream:consume(emqx_utils_stream:csv(BadData))
  258. ).