emqx_utils_stream_tests.erl 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils_stream_tests).
  17. -include_lib("eunit/include/eunit.hrl").
  18. empty_test() ->
  19. S = emqx_utils_stream:empty(),
  20. ?assertEqual([], emqx_utils_stream:next(S)).
  21. empty_consume_test() ->
  22. S = emqx_utils_stream:empty(),
  23. ?assertEqual([], emqx_utils_stream:consume(S)).
  24. chain_empties_test() ->
  25. S = emqx_utils_stream:chain(
  26. emqx_utils_stream:empty(),
  27. emqx_utils_stream:empty()
  28. ),
  29. ?assertEqual([], emqx_utils_stream:next(S)).
  30. chain_list_test() ->
  31. S = emqx_utils_stream:chain(
  32. emqx_utils_stream:list([1, 2, 3]),
  33. emqx_utils_stream:list([4, 5, 6])
  34. ),
  35. ?assertEqual(
  36. [1, 2, 3, 4, 5, 6],
  37. emqx_utils_stream:consume(S)
  38. ).
  39. chain_take_test() ->
  40. S = emqx_utils_stream:chain(
  41. emqx_utils_stream:list([1, 2, 3]),
  42. emqx_utils_stream:list([4, 5, 6, 7, 8])
  43. ),
  44. ?assertMatch(
  45. {[1, 2, 3, 4, 5], _SRest},
  46. emqx_utils_stream:consume(5, S)
  47. ),
  48. {_, SRest} = emqx_utils_stream:consume(5, S),
  49. ?assertEqual(
  50. [6, 7, 8],
  51. emqx_utils_stream:consume(5, SRest)
  52. ).
  53. chain_list_map_test() ->
  54. S = emqx_utils_stream:map(
  55. fun integer_to_list/1,
  56. emqx_utils_stream:chain(
  57. emqx_utils_stream:list([1, 2, 3]),
  58. emqx_utils_stream:chain(
  59. emqx_utils_stream:empty(),
  60. emqx_utils_stream:list([4, 5, 6])
  61. )
  62. )
  63. ),
  64. ?assertEqual(
  65. ["1", "2", "3", "4", "5", "6"],
  66. emqx_utils_stream:consume(S)
  67. ).
  68. mqueue_test() ->
  69. _ = erlang:send_after(1, self(), 1),
  70. _ = erlang:send_after(100, self(), 2),
  71. _ = erlang:send_after(20, self(), 42),
  72. ?assertEqual(
  73. [1, 42, 2],
  74. emqx_utils_stream:consume(emqx_utils_stream:mqueue(400))
  75. ).
  76. csv_test() ->
  77. Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>,
  78. ?assertEqual(
  79. [
  80. #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>},
  81. #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>}
  82. ],
  83. emqx_utils_stream:consume(emqx_utils_stream:csv(Data1))
  84. ),
  85. Data2 = <<"h1, h2, h3\nvv1, vv2, vv3\nvv4,vv5,vv6\n">>,
  86. ?assertEqual(
  87. [
  88. #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>},
  89. #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>}
  90. ],
  91. emqx_utils_stream:consume(emqx_utils_stream:csv(Data2))
  92. ),
  93. ?assertEqual(
  94. [],
  95. emqx_utils_stream:consume(emqx_utils_stream:csv(<<"">>))
  96. ),
  97. BadData = <<"h1,h2,h3\r\nv1,v2,v3\r\nv4,v5">>,
  98. ?assertException(
  99. error,
  100. bad_format,
  101. emqx_utils_stream:consume(emqx_utils_stream:csv(BadData))
  102. ).