emqx_utils_stream_tests.erl 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils_stream_tests).
  17. -include_lib("eunit/include/eunit.hrl").
  18. empty_test() ->
  19. S = emqx_utils_stream:empty(),
  20. ?assertEqual([], emqx_utils_stream:next(S)).
  21. empty_consume_test() ->
  22. S = emqx_utils_stream:empty(),
  23. ?assertEqual([], emqx_utils_stream:consume(S)).
  24. chain_empties_test() ->
  25. S = emqx_utils_stream:chain(
  26. emqx_utils_stream:empty(),
  27. emqx_utils_stream:empty()
  28. ),
  29. ?assertEqual([], emqx_utils_stream:next(S)).
  30. chain_list_test() ->
  31. S = emqx_utils_stream:chain(
  32. emqx_utils_stream:list([1, 2, 3]),
  33. emqx_utils_stream:list([4, 5, 6])
  34. ),
  35. ?assertEqual(
  36. [1, 2, 3, 4, 5, 6],
  37. emqx_utils_stream:consume(S)
  38. ).
  39. chain_take_test() ->
  40. S = emqx_utils_stream:chain(
  41. emqx_utils_stream:list([1, 2, 3]),
  42. emqx_utils_stream:list([4, 5, 6, 7, 8])
  43. ),
  44. ?assertMatch(
  45. {[1, 2, 3, 4, 5], _SRest},
  46. emqx_utils_stream:consume(5, S)
  47. ),
  48. {_, SRest} = emqx_utils_stream:consume(5, S),
  49. ?assertEqual(
  50. [6, 7, 8],
  51. emqx_utils_stream:consume(5, SRest)
  52. ).
  53. chain_list_map_test() ->
  54. S = emqx_utils_stream:map(
  55. fun integer_to_list/1,
  56. emqx_utils_stream:chain(
  57. emqx_utils_stream:list([1, 2, 3]),
  58. emqx_utils_stream:chain(
  59. emqx_utils_stream:empty(),
  60. emqx_utils_stream:list([4, 5, 6])
  61. )
  62. )
  63. ),
  64. ?assertEqual(
  65. ["1", "2", "3", "4", "5", "6"],
  66. emqx_utils_stream:consume(S)
  67. ).
  68. transpose_test() ->
  69. S = emqx_utils_stream:transpose([
  70. emqx_utils_stream:list([1, 2, 3]),
  71. emqx_utils_stream:list([4, 5, 6, 7])
  72. ]),
  73. ?assertEqual(
  74. [[1, 4], [2, 5], [3, 6]],
  75. emqx_utils_stream:consume(S)
  76. ).
  77. transpose_none_test() ->
  78. ?assertEqual(
  79. [],
  80. emqx_utils_stream:consume(emqx_utils_stream:transpose([]))
  81. ).
  82. transpose_one_test() ->
  83. S = emqx_utils_stream:transpose([emqx_utils_stream:list([1, 2, 3])]),
  84. ?assertEqual(
  85. [[1], [2], [3]],
  86. emqx_utils_stream:consume(S)
  87. ).
  88. transpose_many_test() ->
  89. S = emqx_utils_stream:transpose([
  90. emqx_utils_stream:list([1, 2, 3]),
  91. emqx_utils_stream:list([4, 5, 6, 7]),
  92. emqx_utils_stream:list([8, 9])
  93. ]),
  94. ?assertEqual(
  95. [[1, 4, 8], [2, 5, 9]],
  96. emqx_utils_stream:consume(S)
  97. ).
  98. transpose_many_empty_test() ->
  99. S = emqx_utils_stream:transpose([
  100. emqx_utils_stream:list([1, 2, 3]),
  101. emqx_utils_stream:list([4, 5, 6, 7]),
  102. emqx_utils_stream:empty()
  103. ]),
  104. ?assertEqual(
  105. [],
  106. emqx_utils_stream:consume(S)
  107. ).
  108. repeat_test() ->
  109. S = emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2, 3])),
  110. ?assertMatch(
  111. {[1, 2, 3, 1, 2, 3, 1, 2], _},
  112. emqx_utils_stream:consume(8, S)
  113. ),
  114. {_, SRest} = emqx_utils_stream:consume(8, S),
  115. ?assertMatch(
  116. {[3, 1, 2, 3, 1, 2, 3, 1], _},
  117. emqx_utils_stream:consume(8, SRest)
  118. ).
  119. repeat_empty_test() ->
  120. S = emqx_utils_stream:repeat(emqx_utils_stream:list([])),
  121. ?assertEqual(
  122. [],
  123. emqx_utils_stream:consume(8, S)
  124. ).
  125. transpose_repeat_test() ->
  126. S = emqx_utils_stream:transpose([
  127. emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2])),
  128. emqx_utils_stream:list([4, 5, 6, 7, 8])
  129. ]),
  130. ?assertEqual(
  131. [[1, 4], [2, 5], [1, 6], [2, 7], [1, 8]],
  132. emqx_utils_stream:consume(S)
  133. ).
  134. mqueue_test() ->
  135. _ = erlang:send_after(1, self(), 1),
  136. _ = erlang:send_after(100, self(), 2),
  137. _ = erlang:send_after(20, self(), 42),
  138. ?assertEqual(
  139. [1, 42, 2],
  140. emqx_utils_stream:consume(emqx_utils_stream:mqueue(400))
  141. ).
  142. csv_test() ->
  143. Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>,
  144. ?assertEqual(
  145. [
  146. #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>},
  147. #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>}
  148. ],
  149. emqx_utils_stream:consume(emqx_utils_stream:csv(Data1))
  150. ),
  151. Data2 = <<"h1, h2, h3\nvv1, vv2, vv3\nvv4,vv5,vv6\n">>,
  152. ?assertEqual(
  153. [
  154. #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>},
  155. #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>}
  156. ],
  157. emqx_utils_stream:consume(emqx_utils_stream:csv(Data2))
  158. ),
  159. ?assertEqual(
  160. [],
  161. emqx_utils_stream:consume(emqx_utils_stream:csv(<<"">>))
  162. ),
  163. BadData = <<"h1,h2,h3\r\nv1,v2,v3\r\nv4,v5">>,
  164. ?assertException(
  165. error,
  166. bad_format,
  167. emqx_utils_stream:consume(emqx_utils_stream:csv(BadData))
  168. ).