emqx_utils_stream.erl 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_utils_stream).
  17. %% Constructors / Combinators
  18. -export([
  19. empty/0,
  20. list/1,
  21. mqueue/1,
  22. map/2,
  23. transpose/1,
  24. chain/2,
  25. repeat/1
  26. ]).
  27. %% Evaluating
  28. -export([
  29. next/1,
  30. consume/1,
  31. consume/2
  32. ]).
  33. %% Streams from ETS tables
  34. -export([
  35. ets/1
  36. ]).
  37. %% Streams from .csv data
  38. -export([
  39. csv/1
  40. ]).
  41. -export_type([stream/1]).
  42. %% @doc A stream is essentially a lazy list.
  43. -type stream(T) :: fun(() -> next(T) | []).
  44. -type next(T) :: nonempty_improper_list(T, stream(T)).
  45. -dialyzer(no_improper_lists).
  46. -elvis([{elvis_style, nesting_level, disable}]).
  47. %%
  48. %% @doc Make a stream that produces no values.
  49. -spec empty() -> stream(none()).
  50. empty() ->
  51. fun() -> [] end.
  52. %% @doc Make a stream out of the given list.
  53. %% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`.
  54. -spec list([T]) -> stream(T).
  55. list([]) ->
  56. empty();
  57. list([X | Rest]) ->
  58. fun() -> [X | list(Rest)] end.
  59. %% @doc Make a stream out of process message queue.
  60. -spec mqueue(timeout()) -> stream(any()).
  61. mqueue(Timeout) ->
  62. fun() ->
  63. receive
  64. X ->
  65. [X | mqueue(Timeout)]
  66. after Timeout ->
  67. []
  68. end
  69. end.
  70. %% @doc Make a stream by applying a function to each element of the underlying stream.
  71. -spec map(fun((X) -> Y), stream(X)) -> stream(Y).
  72. map(F, S) ->
  73. fun() ->
  74. case next(S) of
  75. [X | Rest] ->
  76. [F(X) | map(F, Rest)];
  77. [] ->
  78. []
  79. end
  80. end.
  81. %% @doc Transpose a list of streams into a stream producing lists of their respective values.
  82. %% The resulting stream is as long as the shortest of the input streams.
  83. -spec transpose([stream(X)]) -> stream([X]).
  84. transpose([S]) ->
  85. map(fun(X) -> [X] end, S);
  86. transpose([S | Streams]) ->
  87. transpose_tail(S, transpose(Streams));
  88. transpose([]) ->
  89. empty().
  90. transpose_tail(S, Tail) ->
  91. fun() ->
  92. case next(S) of
  93. [X | SRest] ->
  94. case next(Tail) of
  95. [Xs | TailRest] ->
  96. [[X | Xs] | transpose_tail(SRest, TailRest)];
  97. [] ->
  98. []
  99. end;
  100. [] ->
  101. []
  102. end
  103. end.
  104. %% @doc Make a stream by chaining (concatenating) two streams.
  105. %% The second stream begins to produce values only after the first one is exhausted.
  106. -spec chain(stream(X), stream(Y)) -> stream(X | Y).
  107. chain(SFirst, SThen) ->
  108. fun() ->
  109. case next(SFirst) of
  110. [X | SRest] ->
  111. [X | chain(SRest, SThen)];
  112. [] ->
  113. next(SThen)
  114. end
  115. end.
  116. %% @doc Make an infinite stream out of repeats of given stream.
  117. %% If the given stream is empty, the resulting stream is also empty.
  118. -spec repeat(stream(X)) -> stream(X).
  119. repeat(S) ->
  120. fun() ->
  121. case next(S) of
  122. [X | SRest] ->
  123. [X | chain(SRest, repeat(S))];
  124. [] ->
  125. []
  126. end
  127. end.
  128. %%
  129. %% @doc Produce the next value from the stream.
  130. -spec next(stream(T)) -> next(T) | [].
  131. next(S) ->
  132. S().
  133. %% @doc Consume the stream and return a list of all produced values.
  134. -spec consume(stream(T)) -> [T].
  135. consume(S) ->
  136. case next(S) of
  137. [X | SRest] ->
  138. [X | consume(SRest)];
  139. [] ->
  140. []
  141. end.
  142. %% @doc Consume N values from the stream and return a list of them and the rest of the stream.
  143. %% If the stream is exhausted before N values are produced, return just a list of these values.
  144. -spec consume(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T].
  145. consume(N, S) ->
  146. consume(N, S, []).
  147. consume(0, S, Acc) ->
  148. {lists:reverse(Acc), S};
  149. consume(N, S, Acc) ->
  150. case next(S) of
  151. [X | SRest] ->
  152. consume(N - 1, SRest, [X | Acc]);
  153. [] ->
  154. lists:reverse(Acc)
  155. end.
  156. %%
  157. -type select_result(Record, Cont) ::
  158. {[Record], Cont}
  159. | {[Record], '$end_of_table'}
  160. | '$end_of_table'.
  161. %% @doc Make a stream out of an ETS table, where the ETS table is scanned through in chunks,
  162. %% with the given continuation function. The function is assumed to return a result of a call to:
  163. %% * `ets:select/1` / `ets:select/3`
  164. %% * `ets:match/1` / `ets:match/3`
  165. %% * `ets:match_object/1` / `ets:match_object/3`
  166. -spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record).
  167. ets(ContF) ->
  168. ets(undefined, ContF).
  169. ets(Cont, ContF) ->
  170. fun() ->
  171. case ContF(Cont) of
  172. {Records, '$end_of_table'} ->
  173. next(list(Records));
  174. {Records, NCont} ->
  175. next(chain(list(Records), ets(NCont, ContF)));
  176. '$end_of_table' ->
  177. []
  178. end
  179. end.
  180. %% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once.
  181. %% The .csv binary is assumed to be in UTF-8 encoding and to have a header row.
  182. -spec csv(binary()) -> stream(map()).
  183. csv(Bin) when is_binary(Bin) ->
  184. Reader = fun _Iter(Headers, Lines) ->
  185. case csv_read_line(Lines) of
  186. {Fields, Rest} ->
  187. case length(Fields) == length(Headers) of
  188. true ->
  189. User = maps:from_list(lists:zip(Headers, Fields)),
  190. [User | fun() -> _Iter(Headers, Rest) end];
  191. false ->
  192. error(bad_format)
  193. end;
  194. eof ->
  195. []
  196. end
  197. end,
  198. HeadersAndLines = binary:split(Bin, [<<"\r">>, <<"\n">>], [global, trim_all]),
  199. case csv_read_line(HeadersAndLines) of
  200. {CSVHeaders, CSVLines} ->
  201. fun() -> Reader(CSVHeaders, CSVLines) end;
  202. eof ->
  203. empty()
  204. end.
  205. csv_read_line([Line | Lines]) ->
  206. %% XXX: not support ' ' for the field value
  207. Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]),
  208. {Fields, Lines};
  209. csv_read_line([]) ->
  210. eof.